diff --git a/src/mito2/src/engine.rs b/src/mito2/src/engine.rs index 49bd39d3cc..676e2c17ad 100644 --- a/src/mito2/src/engine.rs +++ b/src/mito2/src/engine.rs @@ -65,6 +65,8 @@ mod scan_test; #[cfg(test)] mod set_role_state_test; #[cfg(test)] +mod skip_wal_test; +#[cfg(test)] mod staging_test; #[cfg(test)] mod sync_test; diff --git a/src/mito2/src/engine/skip_wal_test.rs b/src/mito2/src/engine/skip_wal_test.rs new file mode 100644 index 0000000000..d1b38c47fb --- /dev/null +++ b/src/mito2/src/engine/skip_wal_test.rs @@ -0,0 +1,170 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use api::v1::Rows; +use common_wal::options::{WAL_OPTIONS_KEY, WalOptions}; +use store_api::region_engine::{RegionEngine, RegionRole}; +use store_api::region_request::{RegionCloseRequest, RegionRequest}; +use store_api::storage::{RegionId, ScanRequest}; + +use crate::config::MitoConfig; +use crate::test_util::{CreateRequestBuilder, TestEnv, build_rows, put_rows, rows_schema}; + +#[tokio::test] +async fn test_close_region_skip_wal_with_pending_data() { + test_close_region_skip_wal(true).await; +} + +#[tokio::test] +async fn test_close_region_skip_wal_without_pending_data() { + test_close_region_skip_wal(false).await; +} + +async fn test_close_region_skip_wal(insert: bool) { + common_telemetry::init_default_ut_logging(); + let mut env = TestEnv::with_prefix(&format!("close-skip-wal-{}", insert)).await; + let engine = env.create_engine(MitoConfig::default()).await; + + let region_id = RegionId::new(1, 1); + let mut request = CreateRequestBuilder::new().build(); + + // Set skip_wal = true via WalOptions::Noop + let wal_options = WalOptions::Noop; + request.options.insert( + WAL_OPTIONS_KEY.to_string(), + serde_json::to_string(&wal_options).unwrap(), + ); + + engine + .handle_request(region_id, RegionRequest::Create(request.clone())) + .await + .unwrap(); + + if insert { + let column_schemas = rows_schema(&request); + let rows = Rows { + schema: column_schemas.clone(), + rows: build_rows(0, 3), + }; + put_rows(&engine, region_id, rows).await; + } + + // The region should have data in memtable. + let region = engine.get_region(region_id).unwrap(); + if insert { + assert!(!region.version().memtables.is_empty()); + } else { + assert!(region.version().memtables.is_empty()); + } + + // Close the region. This should trigger a flush. + engine + .handle_request(region_id, RegionRequest::Close(RegionCloseRequest {})) + .await + .unwrap(); + + // After closing, we reopen it and check if data is persisted. + engine + .handle_request( + region_id, + RegionRequest::Open(store_api::region_request::RegionOpenRequest { + engine: String::new(), + table_dir: request.table_dir.clone(), + path_type: store_api::region_request::PathType::Bare, + options: request.options.clone(), + skip_wal_replay: false, + checkpoint: None, + }), + ) + .await + .unwrap(); + let scan_request = ScanRequest::default(); + let stream = engine + .scan_to_stream(region_id, scan_request) + .await + .unwrap(); + let batches = common_recordbatch::RecordBatches::try_collect(stream) + .await + .unwrap(); + // If flush was triggered, data should be there even though WAL was skipped. + let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum(); + if insert { + assert_eq!(3, total_rows); + } else { + assert_eq!(0, total_rows); + } +} + +#[tokio::test] +async fn test_close_follower_region_skip_wal() { + common_telemetry::init_default_ut_logging(); + let mut env = TestEnv::with_prefix("close-follower-skip-wal").await; + let engine = env.create_engine(MitoConfig::default()).await; + + let region_id = RegionId::new(1, 1); + let mut request = CreateRequestBuilder::new().build(); + + // Set skip_wal = true via WalOptions::Noop + let wal_options = WalOptions::Noop; + request.options.insert( + WAL_OPTIONS_KEY.to_string(), + serde_json::to_string(&wal_options).unwrap(), + ); + + engine + .handle_request(region_id, RegionRequest::Create(request.clone())) + .await + .unwrap(); + + let region = engine.get_region(region_id).unwrap(); + + // Set the region to Follower state. + engine + .set_region_role(region_id, RegionRole::Follower) + .unwrap(); + assert!(region.is_follower()); + + // Close the region. This should trigger a flush. + engine + .handle_request(region_id, RegionRequest::Close(RegionCloseRequest {})) + .await + .unwrap(); + + // After closing, we reopen it and check if data is persisted. + engine + .handle_request( + region_id, + RegionRequest::Open(store_api::region_request::RegionOpenRequest { + engine: String::new(), + table_dir: request.table_dir.clone(), + path_type: store_api::region_request::PathType::Bare, + options: request.options.clone(), + skip_wal_replay: false, + checkpoint: None, + }), + ) + .await + .unwrap(); + let scan_request = ScanRequest::default(); + let stream = engine + .scan_to_stream(region_id, scan_request) + .await + .unwrap(); + let batches = common_recordbatch::RecordBatches::try_collect(stream) + .await + .unwrap(); + // If flush was triggered, data should be there even though WAL was skipped. + let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum(); + assert_eq!(0, total_rows); +} diff --git a/src/mito2/src/error.rs b/src/mito2/src/error.rs index f4fd786df3..f6385a292a 100644 --- a/src/mito2/src/error.rs +++ b/src/mito2/src/error.rs @@ -553,18 +553,6 @@ pub enum Error { location: Location, }, - #[snafu(display( - "Region {} is in {:?} state, expect: Leader or Leader(Downgrading)", - region_id, - state - ))] - FlushableRegionState { - region_id: RegionId, - state: RegionRoleState, - #[snafu(implicit)] - location: Location, - }, - #[snafu(display("Invalid options"))] JsonOptions { #[snafu(source)] @@ -1312,7 +1300,6 @@ impl ErrorExt for Error { CompatReader { .. } => StatusCode::Unexpected, InvalidRegionRequest { source, .. } => source.status_code(), RegionState { .. } | UpdateManifest { .. } => StatusCode::RegionNotReady, - FlushableRegionState { .. } => StatusCode::RegionNotReady, JsonOptions { .. } => StatusCode::InvalidArguments, EmptyRegionDir { .. } | EmptyManifestDir { .. } => StatusCode::RegionNotFound, ArrowReader { .. } => StatusCode::StorageUnavailable, diff --git a/src/mito2/src/flush.rs b/src/mito2/src/flush.rs index c769e569aa..1e99d4a895 100644 --- a/src/mito2/src/flush.rs +++ b/src/mito2/src/flush.rs @@ -219,6 +219,8 @@ pub enum FlushReason { Downgrading, /// Enter staging mode. EnterStaging, + /// Flush when region is closing. + Closing, } impl FlushReason { @@ -320,6 +322,7 @@ impl RegionFlushTask { edit, memtables_to_remove, is_staging: self.is_staging, + flush_reason: self.reason, }; WorkerRequest::Background { region_id: self.region_id, diff --git a/src/mito2/src/region.rs b/src/mito2/src/region.rs index 8d97fcd166..79f00cdfed 100644 --- a/src/mito2/src/region.rs +++ b/src/mito2/src/region.rs @@ -44,8 +44,8 @@ pub use utils::*; use crate::access_layer::AccessLayerRef; use crate::error::{ - FlushableRegionStateSnafu, InvalidPartitionExprSnafu, RegionNotFoundSnafu, RegionStateSnafu, - RegionTruncatedSnafu, Result, UnexpectedSnafu, UpdateManifestSnafu, + InvalidPartitionExprSnafu, RegionNotFoundSnafu, RegionStateSnafu, RegionTruncatedSnafu, Result, + UnexpectedSnafu, UpdateManifestSnafu, }; use crate::manifest::action::{ RegionChange, RegionManifest, RegionMetaAction, RegionMetaActionList, @@ -1163,31 +1163,30 @@ impl RegionMap { /// Gets flushable region by region id. /// - /// Returns error if the region does not exist or is not operable. - fn flushable_region(&self, region_id: RegionId) -> Result { + /// Returns error if the region does not exist. + /// Returns None if the region exists but not operatable. + fn flushable_region(&self, region_id: RegionId) -> Result> { let region = self .get_region(region_id) .context(RegionNotFoundSnafu { region_id })?; - ensure!( - region.is_flushable(), - FlushableRegionStateSnafu { - region_id, - state: region.state(), - } - ); - Ok(region) + if region.is_flushable() { + Ok(Some(region)) + } else { + Ok(None) + } } /// Gets flushable region by region id. /// - /// Calls the callback if the region does not exist or is not operable. + /// Calls the callback if the region does not exist. + /// Returns None if the region exists but not operatable. pub(crate) fn flushable_region_or( &self, region_id: RegionId, cb: &mut F, ) -> Option { match self.flushable_region(region_id) { - Ok(region) => Some(region), + Ok(region) => region, Err(e) => { cb.on_failure(e); None @@ -1196,9 +1195,9 @@ impl RegionMap { } /// Remove region by id. - pub(crate) fn remove_region(&self, region_id: RegionId) { + pub(crate) fn remove_region(&self, region_id: RegionId) -> Option { let mut regions = self.regions.write().unwrap(); - regions.remove(®ion_id); + regions.remove(®ion_id) } /// List all regions. diff --git a/src/mito2/src/request.rs b/src/mito2/src/request.rs index 334c1825b0..8f96f4013b 100644 --- a/src/mito2/src/request.rs +++ b/src/mito2/src/request.rs @@ -50,6 +50,7 @@ use crate::error::{ FlushRegionSnafu, InvalidPartitionExprSnafu, InvalidRequestSnafu, MissingPartitionExprSnafu, Result, UnexpectedSnafu, }; +use crate::flush::FlushReason; use crate::manifest::action::{RegionEdit, TruncateKind}; use crate::memtable::MemtableId; use crate::memtable::bulk::part::BulkPart; @@ -914,6 +915,8 @@ pub(crate) struct FlushFinished { pub(crate) memtables_to_remove: SmallVec<[MemtableId; 2]>, /// Whether the region is in staging mode. pub(crate) is_staging: bool, + /// Reason for flush. + pub(crate) flush_reason: FlushReason, } impl FlushFinished { diff --git a/src/mito2/src/worker.rs b/src/mito2/src/worker.rs index 343ff5b55c..41678cdf58 100644 --- a/src/mito2/src/worker.rs +++ b/src/mito2/src/worker.rs @@ -1069,14 +1069,17 @@ impl RegionWorkerLoop { .await; continue; } - DdlRequest::Close(_) => self.handle_close_request(ddl.region_id).await, + DdlRequest::Close(_) => { + self.handle_close_request(ddl.region_id, ddl.sender).await; + continue; + } DdlRequest::Alter(req) => { self.handle_alter_request(ddl.region_id, req, ddl.sender) .await; continue; } DdlRequest::Flush(req) => { - self.handle_flush_request(ddl.region_id, req, ddl.sender); + self.handle_flush_request(ddl.region_id, req, None, ddl.sender); continue; } DdlRequest::Compact(req) => { diff --git a/src/mito2/src/worker/handle_close.rs b/src/mito2/src/worker/handle_close.rs index 1568ae0799..84efc283de 100644 --- a/src/mito2/src/worker/handle_close.rs +++ b/src/mito2/src/worker/handle_close.rs @@ -15,36 +15,68 @@ //! Handling close request. use common_telemetry::info; -use store_api::region_request::AffectedRows; +use store_api::logstore::LogStore; +use store_api::logstore::provider::Provider; +use store_api::region_request::RegionFlushRequest; use store_api::storage::RegionId; -use crate::error::Result; +use crate::flush::FlushReason; +use crate::request::OptionOutputTx; use crate::worker::RegionWorkerLoop; -impl RegionWorkerLoop { +impl RegionWorkerLoop { pub(crate) async fn handle_close_request( &mut self, region_id: RegionId, - ) -> Result { + sender: OptionOutputTx, + ) { let Some(region) = self.regions.get_region(region_id) else { - return Ok(0); + sender.send(Ok(0)); + return; }; info!("Try to close region {}, worker: {}", region_id, self.id); + // If the region is using Noop WAL and has data in memtable, + // we should flush it before closing to ensure durability. + if region.provider == Provider::Noop + && !region + .version_control + .current() + .version + .memtables + .is_empty() + { + info!("Region {} has pending data, waiting for flush", region_id); + self.handle_flush_request( + region_id, + RegionFlushRequest { + row_group_size: None, + }, + Some(FlushReason::Closing), + sender, + ); + return; + } + + // WAL configured or memtable is empty, flush is not necessary. + self.remove_region(region_id).await; + info!("Region {} closed, worker: {}", region_id, self.id); + sender.send(Ok(0)) + } + + /// Remove a region and stop all related tasks. + pub(crate) async fn remove_region(&mut self, region_id: RegionId) { + let Some(region) = self.regions.remove_region(region_id) else { + return; + }; region.stop().await; - self.regions.remove_region(region_id); // Clean flush status. self.flush_scheduler.on_region_closed(region_id); // Clean compaction status. self.compaction_scheduler.on_region_closed(region_id); // clean index build status. self.index_build_scheduler.on_region_closed(region_id).await; - - info!("Region {} closed, worker: {}", region_id, self.id); - self.region_count.dec(); - - Ok(0) } } diff --git a/src/mito2/src/worker/handle_flush.rs b/src/mito2/src/worker/handle_flush.rs index c39f429323..a8d88ed846 100644 --- a/src/mito2/src/worker/handle_flush.rs +++ b/src/mito2/src/worker/handle_flush.rs @@ -165,6 +165,7 @@ impl RegionWorkerLoop { &mut self, region_id: RegionId, request: RegionFlushRequest, + reason: Option, mut sender: OptionOutputTx, ) { let Some(region) = self.regions.flushable_region_or(region_id, &mut sender) else { @@ -175,11 +176,13 @@ impl RegionWorkerLoop { // when handling flush request instead of in `schedule_flush` or `flush_finished`. self.update_topic_latest_entry_id(®ion); - let reason = if region.is_downgrading() { - FlushReason::Downgrading - } else { - FlushReason::Manual - }; + let reason = reason.unwrap_or_else(|| { + if region.is_downgrading() { + FlushReason::Downgrading + } else { + FlushReason::Manual + } + }); let mut task = self.new_flush_task(®ion, reason, request.row_group_size, self.config.clone()); task.push_sender(sender); @@ -277,6 +280,7 @@ impl RegionWorkerLoop { return; } + let flush_on_close = request.flush_reason == FlushReason::Closing; let index_build_file_metas = std::mem::take(&mut request.edit.files_to_add); // Notifies waiters and observes the flush timer. @@ -295,26 +299,30 @@ impl RegionWorkerLoop { .await; } - // Handle pending requests for the region. - if let Some((mut ddl_requests, mut write_requests, mut bulk_writes)) = - self.flush_scheduler.on_flush_success(region_id) - { - // Perform DDLs first because they require empty memtables. - self.handle_ddl_requests(&mut ddl_requests).await; - // Handle pending write requests, we don't stall these requests. - self.handle_write_requests(&mut write_requests, &mut bulk_writes, false) - .await; + if flush_on_close { + // Remove region from server for flush on closing, + // no need to handle requests and schedule compactions. + self.remove_region(region_id).await; + info!("Region {} closed after flush", region_id); + } else { + // Handle pending requests for the region. + if let Some((mut ddl_requests, mut write_requests, mut bulk_writes)) = + self.flush_scheduler.on_flush_success(region_id) + { + // Perform DDLs first because they require empty memtables. + self.handle_ddl_requests(&mut ddl_requests).await; + // Handle pending write requests, we don't stall these requests. + self.handle_write_requests(&mut write_requests, &mut bulk_writes, false) + .await; + } + // Maybe flush worker again. + self.maybe_flush_worker(); + // Handle stalled requests. + self.handle_stalled_requests().await; + // Schedules compaction. + self.schedule_compaction(®ion).await; } - // Maybe flush worker again. - self.maybe_flush_worker(); - - // Handle stalled requests. - self.handle_stalled_requests().await; - - // Schedules compaction. - self.schedule_compaction(®ion).await; - self.listener.on_flush_success(region_id); }