From 2f39a7713742962b43d10be5d0fc3462f4ed8bbe Mon Sep 17 00:00:00 2001 From: Weny Xu Date: Fri, 17 Feb 2023 11:32:55 +0800 Subject: [PATCH] feat: add close method for the region trait (#970) feat: add close for region trait --- src/mito/src/table/test_util/mock_engine.rs | 8 +- src/storage/src/engine.rs | 6 +- src/storage/src/error.rs | 11 ++ src/storage/src/region.rs | 8 ++ src/storage/src/region/tests.rs | 10 +- src/storage/src/region/tests/close.rs | 147 ++++++++++++++++++++ src/storage/src/region/tests/flush.rs | 41 +----- src/storage/src/region/writer.rs | 61 +++++++- src/storage/src/test_util.rs | 1 + src/storage/src/test_util/flush_switch.rs | 53 +++++++ src/store-api/src/storage/region.rs | 2 + 11 files changed, 303 insertions(+), 45 deletions(-) create mode 100644 src/storage/src/region/tests/close.rs create mode 100644 src/storage/src/test_util/flush_switch.rs diff --git a/src/mito/src/table/test_util/mock_engine.rs b/src/mito/src/table/test_util/mock_engine.rs index c84afd2d65..af83d927e8 100644 --- a/src/mito/src/table/test_util/mock_engine.rs +++ b/src/mito/src/table/test_util/mock_engine.rs @@ -192,6 +192,10 @@ impl Region for MockRegion { Ok(()) } + + async fn close(&self) -> Result<()> { + Ok(()) + } } impl MockRegionInner { @@ -279,8 +283,8 @@ impl StorageEngine for MockEngine { return Ok(None); } - async fn close_region(&self, _ctx: &EngineContext, _region: MockRegion) -> Result<()> { - unimplemented!() + async fn close_region(&self, _ctx: &EngineContext, region: MockRegion) -> Result<()> { + region.close().await } async fn create_region( diff --git a/src/storage/src/engine.rs b/src/storage/src/engine.rs index c1dbf8968f..0f8bd36c83 100644 --- a/src/storage/src/engine.rs +++ b/src/storage/src/engine.rs @@ -21,7 +21,7 @@ use object_store::{util, ObjectStore}; use snafu::ResultExt; use store_api::logstore::LogStore; use store_api::storage::{ - CreateOptions, EngineContext, OpenOptions, RegionDescriptor, StorageEngine, + CreateOptions, EngineContext, OpenOptions, Region, RegionDescriptor, StorageEngine, }; use crate::background::JobPoolImpl; @@ -62,8 +62,8 @@ impl StorageEngine for EngineImpl { self.inner.open_region(name, opts).await } - async fn close_region(&self, _ctx: &EngineContext, _region: Self::Region) -> Result<()> { - unimplemented!() + async fn close_region(&self, _ctx: &EngineContext, region: Self::Region) -> Result<()> { + region.close().await } async fn create_region( diff --git a/src/storage/src/error.rs b/src/storage/src/error.rs index 0d0bf7ef85..14d0e0a4bf 100644 --- a/src/storage/src/error.rs +++ b/src/storage/src/error.rs @@ -141,6 +141,12 @@ pub enum Error { #[snafu(display("Task already cancelled"))] Cancelled { backtrace: Backtrace }, + #[snafu(display("Failed to cancel flush, source: {}", source))] + CancelFlush { + #[snafu(backtrace)] + source: BoxedError, + }, + #[snafu(display( "Manifest protocol forbid to read, min_version: {}, supported_version: {}", min_version, @@ -232,6 +238,9 @@ pub enum Error { source: MetadataError, }, + #[snafu(display("Try to write the closed region"))] + ClosedRegion { backtrace: Backtrace }, + #[snafu(display("Invalid projection, source: {}", source))] InvalidProjection { #[snafu(backtrace)] @@ -443,12 +452,14 @@ impl ErrorExt for Error { | DecodeJson { .. } | JoinTask { .. } | Cancelled { .. } + | CancelFlush { .. } | DecodeMetaActionList { .. } | Readline { .. } | WalDataCorrupted { .. } | SequenceNotMonotonic { .. } | ConvertStoreSchema { .. } | InvalidRawRegion { .. } + | ClosedRegion { .. } | FilterColumn { .. } | AlterMetadata { .. } | CompatRead { .. } diff --git a/src/storage/src/region.rs b/src/storage/src/region.rs index 558674ce7c..090d711b77 100644 --- a/src/storage/src/region.rs +++ b/src/storage/src/region.rs @@ -103,6 +103,10 @@ impl Region for RegionImpl { async fn alter(&self, request: AlterRequest) -> Result<()> { self.inner.alter(request).await } + + async fn close(&self) -> Result<()> { + self.inner.close().await + } } /// Storage related config for region. @@ -509,4 +513,8 @@ impl RegionInner { self.writer.alter(alter_ctx, request).await } + + async fn close(&self) -> Result<()> { + self.writer.close().await + } } diff --git a/src/storage/src/region/tests.rs b/src/storage/src/region/tests.rs index a7771e05f2..1041facb56 100644 --- a/src/storage/src/region/tests.rs +++ b/src/storage/src/region/tests.rs @@ -16,6 +16,7 @@ mod alter; mod basic; +mod close; mod flush; mod projection; @@ -75,6 +76,13 @@ impl TesterBase { /// /// Format of data: (timestamp, v0), timestamp is key, v0 is value. pub async fn put(&self, data: &[(i64, Option)]) -> WriteResponse { + self.try_put(data).await.unwrap() + } + + /// Put without version specified, returns [`Result`] + /// + /// Format of data: (timestamp, v0), timestamp is key, v0 is value. + pub async fn try_put(&self, data: &[(i64, Option)]) -> Result { let data: Vec<(TimestampMillisecond, Option)> = data.iter().map(|(l, r)| ((*l).into(), *r)).collect(); // Build a batch without version. @@ -82,7 +90,7 @@ impl TesterBase { let put_data = new_put_data(&data); batch.put(put_data).unwrap(); - self.region.write(&self.write_ctx, batch).await.unwrap() + self.region.write(&self.write_ctx, batch).await } /// Put without version specified directly to inner writer. diff --git a/src/storage/src/region/tests/close.rs b/src/storage/src/region/tests/close.rs new file mode 100644 index 0000000000..b166813a7a --- /dev/null +++ b/src/storage/src/region/tests/close.rs @@ -0,0 +1,147 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Region close tests. + +use std::sync::Arc; + +use log_store::raft_engine::log_store::RaftEngineLogStore; +use store_api::storage::{AlterOperation, AlterRequest, Region, RegionMeta, WriteResponse}; +use tempdir::TempDir; + +use crate::engine; +use crate::error::Error; +use crate::flush::FlushStrategyRef; +use crate::region::tests::{self, FileTesterBase}; +use crate::region::RegionImpl; +use crate::test_util::config_util; +use crate::test_util::flush_switch::{has_parquet_file, FlushSwitch}; + +const REGION_NAME: &str = "region-close-0"; + +/// Tester for region close +struct CloseTester { + base: Option, +} + +/// Create a new region for flush test +async fn create_region_for_close( + store_dir: &str, + enable_version_column: bool, + flush_strategy: FlushStrategyRef, +) -> RegionImpl { + let metadata = tests::new_metadata(REGION_NAME, enable_version_column); + + let mut store_config = config_util::new_store_config(REGION_NAME, store_dir).await; + store_config.flush_strategy = flush_strategy; + + RegionImpl::create(metadata, store_config).await.unwrap() +} + +impl CloseTester { + async fn new(store_dir: &str, flush_strategy: FlushStrategyRef) -> CloseTester { + let region = create_region_for_close(store_dir, false, flush_strategy.clone()).await; + + CloseTester { + base: Some(FileTesterBase::with_region(region)), + } + } + + #[inline] + fn base(&self) -> &FileTesterBase { + self.base.as_ref().unwrap() + } + + async fn put(&self, data: &[(i64, Option)]) -> WriteResponse { + self.base().put(data).await + } + + async fn try_put(&self, data: &[(i64, Option)]) -> Result { + self.base().try_put(data).await + } + + async fn try_alter(&self, mut req: AlterRequest) -> Result<(), Error> { + let version = self.version(); + req.version = version; + + self.base().region.alter(req).await + } + + fn version(&self) -> u32 { + let metadata = self.base().region.in_memory_metadata(); + metadata.version() + } +} + +#[tokio::test] +async fn test_close_basic() { + common_telemetry::init_default_ut_logging(); + let dir = TempDir::new("close-basic").unwrap(); + let store_dir = dir.path().to_str().unwrap(); + + let flush_switch = Arc::new(FlushSwitch::default()); + let tester = CloseTester::new(store_dir, flush_switch).await; + + tester.base().region.close().await.unwrap(); + + let data = [(1000, Some(100))]; + + let closed_region_error = "Try to write the closed region".to_string(); + // Put one element should return ClosedRegion error + assert_eq!( + tester.try_put(&data).await.unwrap_err().to_string(), + closed_region_error + ); + + // Alter table should return ClosedRegion error + assert_eq!( + tester + .try_alter(AlterRequest { + operation: AlterOperation::AddColumns { + columns: Vec::new(), + }, + version: 0, + }) + .await + .unwrap_err() + .to_string(), + closed_region_error + ); +} + +#[tokio::test] +async fn test_close_wait_flush_done() { + common_telemetry::init_default_ut_logging(); + let dir = TempDir::new("close-basic").unwrap(); + let store_dir = dir.path().to_str().unwrap(); + + let flush_switch = Arc::new(FlushSwitch::default()); + let tester = CloseTester::new(store_dir, flush_switch.clone()).await; + + let data = [(1000, Some(100))]; + + // Now set should flush to true to trigger flush. + flush_switch.set_should_flush(true); + + // Put one element so we have content to flush. + tester.put(&data).await; + + let sst_dir = format!("{}/{}", store_dir, engine::region_sst_dir("", REGION_NAME)); + assert!(!has_parquet_file(&sst_dir)); + + // Close should cancel the flush. + tester.base().region.close().await.unwrap(); + + assert!(!has_parquet_file(&sst_dir)); +} diff --git a/src/storage/src/region/tests/flush.rs b/src/storage/src/region/tests/flush.rs index 662edb781b..f22aca90c5 100644 --- a/src/storage/src/region/tests/flush.rs +++ b/src/storage/src/region/tests/flush.rs @@ -14,7 +14,6 @@ //! Region flush tests. -use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; use log_store::raft_engine::log_store::RaftEngineLogStore; @@ -22,10 +21,11 @@ use store_api::storage::{OpenOptions, WriteResponse}; use tempdir::TempDir; use crate::engine; -use crate::flush::{FlushStrategy, FlushStrategyRef}; +use crate::flush::FlushStrategyRef; use crate::region::tests::{self, FileTesterBase}; -use crate::region::{RegionImpl, SharedDataRef}; +use crate::region::RegionImpl; use crate::test_util::config_util; +use crate::test_util::flush_switch::{has_parquet_file, FlushSwitch}; const REGION_NAME: &str = "region-flush-0"; @@ -96,41 +96,6 @@ impl FlushTester { } } -#[derive(Debug, Default)] -struct FlushSwitch { - should_flush: AtomicBool, -} - -impl FlushSwitch { - fn set_should_flush(&self, should_flush: bool) { - self.should_flush.store(should_flush, Ordering::Relaxed); - } -} - -impl FlushStrategy for FlushSwitch { - fn should_flush( - &self, - _shared: &SharedDataRef, - _bytes_mutable: usize, - _bytes_total: usize, - ) -> bool { - self.should_flush.load(Ordering::Relaxed) - } -} - -fn has_parquet_file(sst_dir: &str) -> bool { - for entry in std::fs::read_dir(sst_dir).unwrap() { - let entry = entry.unwrap(); - let path = entry.path(); - if !path.is_dir() { - assert_eq!("parquet", path.extension().unwrap()); - return true; - } - } - - false -} - #[tokio::test] async fn test_flush_and_stall() { common_telemetry::init_default_ut_logging(); diff --git a/src/storage/src/region/writer.rs b/src/storage/src/region/writer.rs index 678ec20b5f..188161f99b 100644 --- a/src/storage/src/region/writer.rs +++ b/src/storage/src/region/writer.rs @@ -14,10 +14,11 @@ use std::sync::Arc; +use common_error::prelude::BoxedError; use common_telemetry::tracing::log::info; use common_telemetry::{error, logging}; use futures::TryStreamExt; -use snafu::ResultExt; +use snafu::{ensure, ResultExt}; use store_api::logstore::LogStore; use store_api::manifest::{Manifest, ManifestVersion, MetaAction}; use store_api::storage::{AlterRequest, SequenceNumber, WriteContext, WriteResponse}; @@ -74,6 +75,9 @@ impl RegionWriter { writer_ctx: WriterContext<'_, S>, ) -> Result { let mut inner = self.inner.lock().await; + + ensure!(!inner.is_closed(), error::ClosedRegionSnafu); + inner .write(&self.version_mutex, ctx, request, writer_ctx) .await @@ -155,6 +159,8 @@ impl RegionWriter { // alter request to the region. let inner = self.inner.lock().await; + ensure!(!inner.is_closed(), error::ClosedRegionSnafu); + let version_control = alter_ctx.version_control(); let old_metadata = version_control.metadata(); @@ -225,6 +231,43 @@ impl RegionWriter { Ok(()) } + + pub async fn close(&self) -> Result<()> { + // In order to close a writer + // 1. Acquires the write lock. + // 2. Sets a memory flag to reject any potential writing. + // 3. Waits for the pending flush task. + { + let mut inner = self.inner.lock().await; + + if inner.is_closed() { + return Ok(()); + } + + inner.mark_closed(); + } + // we release the writer lock once for rejecting any following potential writing requests immediately. + + self.cancel_flush().await?; + + // TODO: canncel the compaction task + + Ok(()) + } + + /// Cancel flush task if any + async fn cancel_flush(&self) -> Result<()> { + let mut inner = self.inner.lock().await; + + if let Some(task) = inner.flush_handle.take() { + task.cancel() + .await + .map_err(BoxedError::new) + .context(error::CancelFlushSnafu)?; + } + + Ok(()) + } } // Private methods for tests. @@ -275,6 +318,11 @@ impl<'a, S: LogStore> AlterContext<'a, S> { struct WriterInner { memtable_builder: MemtableBuilderRef, flush_handle: Option, + + /// `WriterInner` will reject any future writing, if the closed flag is set. + /// + /// It should protected by upper mutex + closed: bool, engine_config: Arc, } @@ -284,6 +332,7 @@ impl WriterInner { memtable_builder, flush_handle: None, engine_config, + closed: false, } } @@ -617,4 +666,14 @@ impl WriterInner { }); Some(schedule_compaction_cb) } + + #[inline] + fn is_closed(&self) -> bool { + self.closed + } + + #[inline] + fn mark_closed(&mut self) { + self.closed = true; + } } diff --git a/src/storage/src/test_util.rs b/src/storage/src/test_util.rs index 8952408fdf..bdf9483854 100644 --- a/src/storage/src/test_util.rs +++ b/src/storage/src/test_util.rs @@ -14,6 +14,7 @@ pub mod config_util; pub mod descriptor_util; +pub mod flush_switch; pub mod read_util; pub mod schema_util; pub mod write_batch_util; diff --git a/src/storage/src/test_util/flush_switch.rs b/src/storage/src/test_util/flush_switch.rs new file mode 100644 index 0000000000..6d1b7df9cc --- /dev/null +++ b/src/storage/src/test_util/flush_switch.rs @@ -0,0 +1,53 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::atomic::{AtomicBool, Ordering}; + +use crate::flush::FlushStrategy; +use crate::region::SharedDataRef; + +#[derive(Debug, Default)] +pub struct FlushSwitch { + should_flush: AtomicBool, +} + +impl FlushSwitch { + pub fn set_should_flush(&self, should_flush: bool) { + self.should_flush.store(should_flush, Ordering::Relaxed); + } +} + +impl FlushStrategy for FlushSwitch { + fn should_flush( + &self, + _shared: &SharedDataRef, + _bytes_mutable: usize, + _bytes_total: usize, + ) -> bool { + self.should_flush.load(Ordering::Relaxed) + } +} + +pub fn has_parquet_file(sst_dir: &str) -> bool { + for entry in std::fs::read_dir(sst_dir).unwrap() { + let entry = entry.unwrap(); + let path = entry.path(); + if !path.is_dir() { + assert_eq!("parquet", path.extension().unwrap()); + return true; + } + } + + false +} diff --git a/src/store-api/src/storage/region.rs b/src/store-api/src/storage/region.rs index a8a732469b..032fbc5c2f 100644 --- a/src/store-api/src/storage/region.rs +++ b/src/store-api/src/storage/region.rs @@ -72,6 +72,8 @@ pub trait Region: Send + Sync + Clone + std::fmt::Debug + 'static { fn write_request(&self) -> Self::WriteRequest; async fn alter(&self, request: AlterRequest) -> Result<(), Self::Error>; + + async fn close(&self) -> Result<(), Self::Error>; } /// Context for write operations.