feat: add close method for the region trait (#970)

feat: add close for region trait
This commit is contained in:
Weny Xu
2023-02-17 11:32:55 +08:00
committed by GitHub
parent 16f86a9d77
commit 2f39a77137
11 changed files with 303 additions and 45 deletions

View File

@@ -192,6 +192,10 @@ impl Region for MockRegion {
Ok(())
}
async fn close(&self) -> Result<()> {
Ok(())
}
}
impl MockRegionInner {
@@ -279,8 +283,8 @@ impl StorageEngine for MockEngine {
return Ok(None);
}
async fn close_region(&self, _ctx: &EngineContext, _region: MockRegion) -> Result<()> {
unimplemented!()
async fn close_region(&self, _ctx: &EngineContext, region: MockRegion) -> Result<()> {
region.close().await
}
async fn create_region(

View File

@@ -21,7 +21,7 @@ use object_store::{util, ObjectStore};
use snafu::ResultExt;
use store_api::logstore::LogStore;
use store_api::storage::{
CreateOptions, EngineContext, OpenOptions, RegionDescriptor, StorageEngine,
CreateOptions, EngineContext, OpenOptions, Region, RegionDescriptor, StorageEngine,
};
use crate::background::JobPoolImpl;
@@ -62,8 +62,8 @@ impl<S: LogStore> StorageEngine for EngineImpl<S> {
self.inner.open_region(name, opts).await
}
async fn close_region(&self, _ctx: &EngineContext, _region: Self::Region) -> Result<()> {
unimplemented!()
async fn close_region(&self, _ctx: &EngineContext, region: Self::Region) -> Result<()> {
region.close().await
}
async fn create_region(

View File

@@ -141,6 +141,12 @@ pub enum Error {
#[snafu(display("Task already cancelled"))]
Cancelled { backtrace: Backtrace },
#[snafu(display("Failed to cancel flush, source: {}", source))]
CancelFlush {
#[snafu(backtrace)]
source: BoxedError,
},
#[snafu(display(
"Manifest protocol forbid to read, min_version: {}, supported_version: {}",
min_version,
@@ -232,6 +238,9 @@ pub enum Error {
source: MetadataError,
},
#[snafu(display("Try to write the closed region"))]
ClosedRegion { backtrace: Backtrace },
#[snafu(display("Invalid projection, source: {}", source))]
InvalidProjection {
#[snafu(backtrace)]
@@ -443,12 +452,14 @@ impl ErrorExt for Error {
| DecodeJson { .. }
| JoinTask { .. }
| Cancelled { .. }
| CancelFlush { .. }
| DecodeMetaActionList { .. }
| Readline { .. }
| WalDataCorrupted { .. }
| SequenceNotMonotonic { .. }
| ConvertStoreSchema { .. }
| InvalidRawRegion { .. }
| ClosedRegion { .. }
| FilterColumn { .. }
| AlterMetadata { .. }
| CompatRead { .. }

View File

@@ -103,6 +103,10 @@ impl<S: LogStore> Region for RegionImpl<S> {
async fn alter(&self, request: AlterRequest) -> Result<()> {
self.inner.alter(request).await
}
async fn close(&self) -> Result<()> {
self.inner.close().await
}
}
/// Storage related config for region.
@@ -509,4 +513,8 @@ impl<S: LogStore> RegionInner<S> {
self.writer.alter(alter_ctx, request).await
}
async fn close(&self) -> Result<()> {
self.writer.close().await
}
}

View File

@@ -16,6 +16,7 @@
mod alter;
mod basic;
mod close;
mod flush;
mod projection;
@@ -75,6 +76,13 @@ impl<S: LogStore> TesterBase<S> {
///
/// Format of data: (timestamp, v0), timestamp is key, v0 is value.
pub async fn put(&self, data: &[(i64, Option<i64>)]) -> WriteResponse {
self.try_put(data).await.unwrap()
}
/// Put without version specified, returns [`Result<WriteResponse>`]
///
/// Format of data: (timestamp, v0), timestamp is key, v0 is value.
pub async fn try_put(&self, data: &[(i64, Option<i64>)]) -> Result<WriteResponse> {
let data: Vec<(TimestampMillisecond, Option<i64>)> =
data.iter().map(|(l, r)| ((*l).into(), *r)).collect();
// Build a batch without version.
@@ -82,7 +90,7 @@ impl<S: LogStore> TesterBase<S> {
let put_data = new_put_data(&data);
batch.put(put_data).unwrap();
self.region.write(&self.write_ctx, batch).await.unwrap()
self.region.write(&self.write_ctx, batch).await
}
/// Put without version specified directly to inner writer.

View File

@@ -0,0 +1,147 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! Region close tests.
use std::sync::Arc;
use log_store::raft_engine::log_store::RaftEngineLogStore;
use store_api::storage::{AlterOperation, AlterRequest, Region, RegionMeta, WriteResponse};
use tempdir::TempDir;
use crate::engine;
use crate::error::Error;
use crate::flush::FlushStrategyRef;
use crate::region::tests::{self, FileTesterBase};
use crate::region::RegionImpl;
use crate::test_util::config_util;
use crate::test_util::flush_switch::{has_parquet_file, FlushSwitch};
const REGION_NAME: &str = "region-close-0";
/// Tester for region close
struct CloseTester {
base: Option<FileTesterBase>,
}
/// Create a new region for flush test
async fn create_region_for_close(
store_dir: &str,
enable_version_column: bool,
flush_strategy: FlushStrategyRef,
) -> RegionImpl<RaftEngineLogStore> {
let metadata = tests::new_metadata(REGION_NAME, enable_version_column);
let mut store_config = config_util::new_store_config(REGION_NAME, store_dir).await;
store_config.flush_strategy = flush_strategy;
RegionImpl::create(metadata, store_config).await.unwrap()
}
impl CloseTester {
async fn new(store_dir: &str, flush_strategy: FlushStrategyRef) -> CloseTester {
let region = create_region_for_close(store_dir, false, flush_strategy.clone()).await;
CloseTester {
base: Some(FileTesterBase::with_region(region)),
}
}
#[inline]
fn base(&self) -> &FileTesterBase {
self.base.as_ref().unwrap()
}
async fn put(&self, data: &[(i64, Option<i64>)]) -> WriteResponse {
self.base().put(data).await
}
async fn try_put(&self, data: &[(i64, Option<i64>)]) -> Result<WriteResponse, Error> {
self.base().try_put(data).await
}
async fn try_alter(&self, mut req: AlterRequest) -> Result<(), Error> {
let version = self.version();
req.version = version;
self.base().region.alter(req).await
}
fn version(&self) -> u32 {
let metadata = self.base().region.in_memory_metadata();
metadata.version()
}
}
#[tokio::test]
async fn test_close_basic() {
common_telemetry::init_default_ut_logging();
let dir = TempDir::new("close-basic").unwrap();
let store_dir = dir.path().to_str().unwrap();
let flush_switch = Arc::new(FlushSwitch::default());
let tester = CloseTester::new(store_dir, flush_switch).await;
tester.base().region.close().await.unwrap();
let data = [(1000, Some(100))];
let closed_region_error = "Try to write the closed region".to_string();
// Put one element should return ClosedRegion error
assert_eq!(
tester.try_put(&data).await.unwrap_err().to_string(),
closed_region_error
);
// Alter table should return ClosedRegion error
assert_eq!(
tester
.try_alter(AlterRequest {
operation: AlterOperation::AddColumns {
columns: Vec::new(),
},
version: 0,
})
.await
.unwrap_err()
.to_string(),
closed_region_error
);
}
#[tokio::test]
async fn test_close_wait_flush_done() {
common_telemetry::init_default_ut_logging();
let dir = TempDir::new("close-basic").unwrap();
let store_dir = dir.path().to_str().unwrap();
let flush_switch = Arc::new(FlushSwitch::default());
let tester = CloseTester::new(store_dir, flush_switch.clone()).await;
let data = [(1000, Some(100))];
// Now set should flush to true to trigger flush.
flush_switch.set_should_flush(true);
// Put one element so we have content to flush.
tester.put(&data).await;
let sst_dir = format!("{}/{}", store_dir, engine::region_sst_dir("", REGION_NAME));
assert!(!has_parquet_file(&sst_dir));
// Close should cancel the flush.
tester.base().region.close().await.unwrap();
assert!(!has_parquet_file(&sst_dir));
}

View File

@@ -14,7 +14,6 @@
//! Region flush tests.
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use log_store::raft_engine::log_store::RaftEngineLogStore;
@@ -22,10 +21,11 @@ use store_api::storage::{OpenOptions, WriteResponse};
use tempdir::TempDir;
use crate::engine;
use crate::flush::{FlushStrategy, FlushStrategyRef};
use crate::flush::FlushStrategyRef;
use crate::region::tests::{self, FileTesterBase};
use crate::region::{RegionImpl, SharedDataRef};
use crate::region::RegionImpl;
use crate::test_util::config_util;
use crate::test_util::flush_switch::{has_parquet_file, FlushSwitch};
const REGION_NAME: &str = "region-flush-0";
@@ -96,41 +96,6 @@ impl FlushTester {
}
}
#[derive(Debug, Default)]
struct FlushSwitch {
should_flush: AtomicBool,
}
impl FlushSwitch {
fn set_should_flush(&self, should_flush: bool) {
self.should_flush.store(should_flush, Ordering::Relaxed);
}
}
impl FlushStrategy for FlushSwitch {
fn should_flush(
&self,
_shared: &SharedDataRef,
_bytes_mutable: usize,
_bytes_total: usize,
) -> bool {
self.should_flush.load(Ordering::Relaxed)
}
}
fn has_parquet_file(sst_dir: &str) -> bool {
for entry in std::fs::read_dir(sst_dir).unwrap() {
let entry = entry.unwrap();
let path = entry.path();
if !path.is_dir() {
assert_eq!("parquet", path.extension().unwrap());
return true;
}
}
false
}
#[tokio::test]
async fn test_flush_and_stall() {
common_telemetry::init_default_ut_logging();

View File

@@ -14,10 +14,11 @@
use std::sync::Arc;
use common_error::prelude::BoxedError;
use common_telemetry::tracing::log::info;
use common_telemetry::{error, logging};
use futures::TryStreamExt;
use snafu::ResultExt;
use snafu::{ensure, ResultExt};
use store_api::logstore::LogStore;
use store_api::manifest::{Manifest, ManifestVersion, MetaAction};
use store_api::storage::{AlterRequest, SequenceNumber, WriteContext, WriteResponse};
@@ -74,6 +75,9 @@ impl RegionWriter {
writer_ctx: WriterContext<'_, S>,
) -> Result<WriteResponse> {
let mut inner = self.inner.lock().await;
ensure!(!inner.is_closed(), error::ClosedRegionSnafu);
inner
.write(&self.version_mutex, ctx, request, writer_ctx)
.await
@@ -155,6 +159,8 @@ impl RegionWriter {
// alter request to the region.
let inner = self.inner.lock().await;
ensure!(!inner.is_closed(), error::ClosedRegionSnafu);
let version_control = alter_ctx.version_control();
let old_metadata = version_control.metadata();
@@ -225,6 +231,43 @@ impl RegionWriter {
Ok(())
}
pub async fn close(&self) -> Result<()> {
// In order to close a writer
// 1. Acquires the write lock.
// 2. Sets a memory flag to reject any potential writing.
// 3. Waits for the pending flush task.
{
let mut inner = self.inner.lock().await;
if inner.is_closed() {
return Ok(());
}
inner.mark_closed();
}
// we release the writer lock once for rejecting any following potential writing requests immediately.
self.cancel_flush().await?;
// TODO: canncel the compaction task
Ok(())
}
/// Cancel flush task if any
async fn cancel_flush(&self) -> Result<()> {
let mut inner = self.inner.lock().await;
if let Some(task) = inner.flush_handle.take() {
task.cancel()
.await
.map_err(BoxedError::new)
.context(error::CancelFlushSnafu)?;
}
Ok(())
}
}
// Private methods for tests.
@@ -275,6 +318,11 @@ impl<'a, S: LogStore> AlterContext<'a, S> {
struct WriterInner {
memtable_builder: MemtableBuilderRef,
flush_handle: Option<JobHandle>,
/// `WriterInner` will reject any future writing, if the closed flag is set.
///
/// It should protected by upper mutex
closed: bool,
engine_config: Arc<EngineConfig>,
}
@@ -284,6 +332,7 @@ impl WriterInner {
memtable_builder,
flush_handle: None,
engine_config,
closed: false,
}
}
@@ -617,4 +666,14 @@ impl WriterInner {
});
Some(schedule_compaction_cb)
}
#[inline]
fn is_closed(&self) -> bool {
self.closed
}
#[inline]
fn mark_closed(&mut self) {
self.closed = true;
}
}

View File

@@ -14,6 +14,7 @@
pub mod config_util;
pub mod descriptor_util;
pub mod flush_switch;
pub mod read_util;
pub mod schema_util;
pub mod write_batch_util;

View File

@@ -0,0 +1,53 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::atomic::{AtomicBool, Ordering};
use crate::flush::FlushStrategy;
use crate::region::SharedDataRef;
#[derive(Debug, Default)]
pub struct FlushSwitch {
should_flush: AtomicBool,
}
impl FlushSwitch {
pub fn set_should_flush(&self, should_flush: bool) {
self.should_flush.store(should_flush, Ordering::Relaxed);
}
}
impl FlushStrategy for FlushSwitch {
fn should_flush(
&self,
_shared: &SharedDataRef,
_bytes_mutable: usize,
_bytes_total: usize,
) -> bool {
self.should_flush.load(Ordering::Relaxed)
}
}
pub fn has_parquet_file(sst_dir: &str) -> bool {
for entry in std::fs::read_dir(sst_dir).unwrap() {
let entry = entry.unwrap();
let path = entry.path();
if !path.is_dir() {
assert_eq!("parquet", path.extension().unwrap());
return true;
}
}
false
}

View File

@@ -72,6 +72,8 @@ pub trait Region: Send + Sync + Clone + std::fmt::Debug + 'static {
fn write_request(&self) -> Self::WriteRequest;
async fn alter(&self, request: AlterRequest) -> Result<(), Self::Error>;
async fn close(&self) -> Result<(), Self::Error>;
}
/// Context for write operations.