diff --git a/src/mito2/src/engine/skip_wal_test.rs b/src/mito2/src/engine/skip_wal_test.rs index c59be6ba2c..97f159b8ac 100644 --- a/src/mito2/src/engine/skip_wal_test.rs +++ b/src/mito2/src/engine/skip_wal_test.rs @@ -171,6 +171,48 @@ async fn test_close_follower_region_skip_wal() { assert_eq!(0, total_rows); } +#[tokio::test] +async fn test_close_follower_region_skip_wal_with_pending_data() { + common_telemetry::init_default_ut_logging(); + let mut env = TestEnv::with_prefix("close-follower-skip-wal-pending-data").await; + let engine = env.create_engine(MitoConfig::default()).await; + + let region_id = RegionId::new(1, 1); + let mut request = CreateRequestBuilder::new().build(); + + let wal_options = WalOptions::Noop; + request.options.insert( + WAL_OPTIONS_KEY.to_string(), + serde_json::to_string(&wal_options).unwrap(), + ); + + engine + .handle_request(region_id, RegionRequest::Create(request.clone())) + .await + .unwrap(); + + let rows = Rows { + schema: rows_schema(&request), + rows: build_rows(0, 3), + }; + put_rows(&engine, region_id, rows).await; + + let region = engine.get_region(region_id).unwrap(); + assert!(!region.version().memtables.is_empty()); + + engine + .set_region_role(region_id, RegionRole::Follower) + .unwrap(); + assert!(region.is_follower()); + + engine + .handle_request(region_id, RegionRequest::Close(RegionCloseRequest {})) + .await + .unwrap(); + + assert!(!engine.is_region_exists(region_id)); +} + #[tokio::test] async fn test_close_region_after_truncate_skip_wal() { common_telemetry::init_default_ut_logging(); diff --git a/src/mito2/src/error.rs b/src/mito2/src/error.rs index 9ae748205e..66e3838895 100644 --- a/src/mito2/src/error.rs +++ b/src/mito2/src/error.rs @@ -1241,6 +1241,18 @@ pub enum Error { #[snafu(implicit)] location: Location, }, + + #[snafu(display( + "Region {} is in {:?} state, expect: Writable, Staging or Downgrading", + region_id, + state + ))] + FlushableRegionState { + region_id: RegionId, + state: RegionRoleState, + #[snafu(implicit)] + location: Location, + }, } pub type Result = std::result::Result; @@ -1430,6 +1442,8 @@ impl ErrorExt for Error { TooManyFilesToRead { .. } | TooManyGcJobs { .. } => StatusCode::RateLimited, PruneFile { source, .. } => source.status_code(), + + FlushableRegionState { .. } => StatusCode::RegionNotReady, } } diff --git a/src/mito2/src/region.rs b/src/mito2/src/region.rs index a898876dcf..c85599bf58 100644 --- a/src/mito2/src/region.rs +++ b/src/mito2/src/region.rs @@ -45,8 +45,8 @@ pub use utils::*; use crate::access_layer::AccessLayerRef; use crate::error::{ - InvalidPartitionExprSnafu, RegionNotFoundSnafu, RegionStateSnafu, RegionTruncatedSnafu, Result, - UnexpectedSnafu, UpdateManifestSnafu, + FlushableRegionStateSnafu, InvalidPartitionExprSnafu, RegionNotFoundSnafu, RegionStateSnafu, + RegionTruncatedSnafu, Result, UnexpectedSnafu, UpdateManifestSnafu, }; use crate::manifest::action::{ RegionChange, RegionManifest, RegionMetaAction, RegionMetaActionList, @@ -1435,35 +1435,19 @@ impl RegionMap { /// Gets flushable region by region id. /// - /// Returns error if the region does not exist. - /// Returns None if the region exists but not operatable. - fn flushable_region(&self, region_id: RegionId) -> Result> { + /// Returns error if the region does not exist or not flushable. + pub(crate) fn flushable_region(&self, region_id: RegionId) -> Result { let region = self .get_region(region_id) .context(RegionNotFoundSnafu { region_id })?; - if region.is_flushable() { - Ok(Some(region)) - } else { - Ok(None) - } - } - - /// Gets flushable region by region id. - /// - /// Calls the callback if the region does not exist. - /// Returns None if the region exists but not operatable. - pub(crate) fn flushable_region_or( - &self, - region_id: RegionId, - cb: &mut F, - ) -> Option { - match self.flushable_region(region_id) { - Ok(region) => region, - Err(e) => { - cb.on_failure(e); - None + ensure!( + region.is_flushable(), + FlushableRegionStateSnafu { + region_id, + state: region.state(), } - } + ); + Ok(region) } /// Remove region by id. diff --git a/src/mito2/src/worker/handle_close.rs b/src/mito2/src/worker/handle_close.rs index 1abd772f9f..3e0f5e52a6 100644 --- a/src/mito2/src/worker/handle_close.rs +++ b/src/mito2/src/worker/handle_close.rs @@ -37,8 +37,8 @@ impl RegionWorkerLoop { info!("Try to close region {}, worker: {}", region_id, self.id); - // If the region is using Noop WAL and has data in memtable, - // we should flush it before closing to ensure durability. + // If the region is using Noop WAL and has data in memtable and region is flushable (like, + // not in follower state), we should flush it before closing to ensure durability. if region.provider == Provider::Noop && !region .version_control @@ -46,6 +46,7 @@ impl RegionWorkerLoop { .version .memtables .is_empty() + && region.is_flushable() { info!("Region {} has pending data, waiting for flush", region_id); self.handle_flush_request( diff --git a/src/mito2/src/worker/handle_flush.rs b/src/mito2/src/worker/handle_flush.rs index fce995134d..cec922a6e1 100644 --- a/src/mito2/src/worker/handle_flush.rs +++ b/src/mito2/src/worker/handle_flush.rs @@ -182,11 +182,16 @@ impl RegionWorkerLoop { region_id: RegionId, request: RegionFlushRequest, reason: Option, - mut sender: OptionOutputTx, + sender: OptionOutputTx, ) { - let Some(region) = self.regions.flushable_region_or(region_id, &mut sender) else { - return; + let region = match self.regions.flushable_region(region_id) { + Ok(region) => region, + Err(e) => { + sender.send(Err(e)); + return; + } }; + // `update_topic_latest_entry_id` updates `topic_latest_entry_id` when memtables are empty. // But the flush is skipped if memtables are empty. Thus should update the `topic_latest_entry_id` // when handling flush request instead of in `schedule_flush` or `flush_finished`.