fix: fix doesn't release closed regions issue (#1596)

* fix: fix close region issue

* chore: apply suggestion from CR

* chore: apply suggestion from CR

* chore: apply suggestion from CR

* chore: apply suggestion from CR

* refactor: remove close method from Region trait

* chore: remove PartialEq from CloseTableResult
This commit is contained in:
Weny Xu
2023-05-23 12:40:12 +09:00
committed by GitHub
parent e646490d16
commit 9f67ad8bce
13 changed files with 232 additions and 94 deletions

View File

@@ -134,7 +134,7 @@ impl OpenRegionHandler {
for r in region_numbers {
let region_exist =
table
.contain_regions(*r)
.contains_region(*r)
.with_context(|_| error::CheckRegionSnafu {
table_name: format_full_table_name(
catalog_name,

View File

@@ -402,7 +402,7 @@ impl EngineInner {
let tables = self.tables.read().unwrap().clone();
futures::future::try_join_all(tables.values().map(|t| t.close()))
futures::future::try_join_all(tables.values().map(|t| t.close(&[])))
.await
.map_err(BoxedError::new)
.context(table_error::TableOperationSnafu)?;
@@ -435,8 +435,9 @@ impl EngineInner {
let _lock = self.table_mutex.lock().await;
if let Some(table) = self.get_table_by_full_name(&full_name) {
let regions = Vec::new();
table
.close()
.close(&regions)
.await
.map_err(BoxedError::new)
.context(table_error::TableOperationSnafu)?;

View File

@@ -105,7 +105,7 @@ impl Table for ImmutableFileTable {
Ok(())
}
async fn close(&self) -> TableResult<()> {
async fn close(&self, _region_number: &[RegionNumber]) -> TableResult<()> {
Ok(())
}
}

View File

@@ -37,11 +37,13 @@ use store_api::storage::{
RowKeyDescriptorBuilder, StorageEngine,
};
use table::engine::{
region_name, table_dir, EngineContext, TableEngine, TableEngineProcedure, TableReference,
region_name, table_dir, CloseTableResult, EngineContext, TableEngine, TableEngineProcedure,
TableReference,
};
use table::metadata::{TableId, TableInfo, TableVersion};
use table::requests::{
AlterKind, AlterTableRequest, CreateTableRequest, DropTableRequest, OpenTableRequest,
AlterKind, AlterTableRequest, CloseTableRequest, CreateTableRequest, DropTableRequest,
OpenTableRequest,
};
use table::{error as table_error, Result as TableResult, Table, TableRef};
@@ -193,6 +195,14 @@ impl<S: StorageEngine> TableEngine for MitoEngine<S> {
self.inner.drop_table(request).await
}
async fn close_table(
&self,
_ctx: &EngineContext,
request: CloseTableRequest,
) -> TableResult<CloseTableResult> {
self.inner.close_table(request).await
}
async fn close(&self) -> TableResult<()> {
self.inner.close().await
}
@@ -386,7 +396,7 @@ fn validate_create_table_request(request: &CreateTableRequest) -> Result<()> {
fn all_regions_open(table: TableRef, regions: &[RegionNumber]) -> TableResult<bool> {
for r in regions {
let region_exist = table.contain_regions(*r)?;
let region_exist = table.contains_region(*r)?;
if !region_exist {
return Ok(false);
}
@@ -527,7 +537,7 @@ impl<S: StorageEngine> MitoEngineInner<S> {
// TODO(weny): Returns an error earlier if the target region does not exist in the meta.
for region_number in region_numbers {
if table.contain_regions(*region_number)? {
if table.contains_region(*region_number)? {
continue;
}
@@ -610,15 +620,28 @@ impl<S: StorageEngine> MitoEngineInner<S> {
let _lock = self.table_mutex.lock(table_ref.to_string()).await;
let removed_table = self.tables.remove(&table_ref.to_string());
// Close the table to close all regions. Closing a region is idempotent.
if let Some((_, table)) = &removed_table {
let regions = table.region_ids();
let table_id = table.table_info().ident.table_id;
table
.drop_regions()
.drop_regions(&regions)
.await
.map_err(BoxedError::new)
.context(table_error::TableOperationSnafu)?;
let ctx = StorageEngineContext::default();
// Releases regions in storage engine
for region_number in regions {
self.storage_engine
.close_region(&ctx, &region_name(table_id, region_number))
.await
.map_err(BoxedError::new)
.context(table_error::TableOperationSnafu)?;
}
Ok(true)
} else {
Ok(false)
@@ -659,7 +682,7 @@ impl<S: StorageEngine> MitoEngineInner<S> {
futures::future::try_join_all(
self.tables
.iter()
.map(|item| close_table(self.table_mutex.clone(), item.value().clone())),
.map(|item| self.close_table_inner(item.value().clone(), None)),
)
.await
.map_err(BoxedError::new)
@@ -673,17 +696,55 @@ impl<S: StorageEngine> MitoEngineInner<S> {
Ok(())
}
}
async fn close_table(lock: Arc<KeyLock<String>>, table: TableRef) -> TableResult<()> {
let info = table.table_info();
let table_ref = TableReference {
catalog: &info.catalog_name,
schema: &info.schema_name,
table: &info.name,
};
let _lock = lock.lock(table_ref.to_string()).await;
table.close().await
async fn close_table(&self, request: CloseTableRequest) -> TableResult<CloseTableResult> {
let table_ref = request.table_ref();
if let Some(table) = self.get_mito_table(&table_ref) {
return self
.close_table_inner(table, Some(&request.region_numbers))
.await;
}
// table doesn't exist
Ok(CloseTableResult::NotFound)
}
async fn close_table_inner(
&self,
table: Arc<MitoTable<S::Region>>,
regions: Option<&[RegionNumber]>,
) -> TableResult<CloseTableResult> {
let info = table.table_info();
let table_ref = TableReference {
catalog: &info.catalog_name,
schema: &info.schema_name,
table: &info.name,
};
let table_id = info.ident.table_id;
let _lock = self.table_mutex.lock(table_ref.to_string()).await;
let all_regions = table.region_ids();
let regions = regions.unwrap_or(&all_regions);
let removed = table.remove_regions(regions).await?;
let removed_regions = removed.keys().cloned().collect::<Vec<_>>();
let ctx = StorageEngineContext::default();
// Releases regions in storage engine
for region_number in regions {
self.storage_engine
.close_region(&ctx, &region_name(table_id, *region_number))
.await
.map_err(BoxedError::new)
.context(table_error::TableOperationSnafu)?;
}
if table.is_releasable() {
self.tables.remove(&table_ref.to_string());
return Ok(CloseTableResult::Released(removed_regions));
}
// Partial closed
Ok(CloseTableResult::PartialClosed(removed_regions))
}
}
impl<S: StorageEngine> MitoEngineInner<S> {

View File

@@ -60,7 +60,6 @@ use crate::error::{
};
use crate::manifest::action::*;
use crate::manifest::TableManifest;
#[inline]
fn table_manifest_dir(table_dir: &str) -> String {
format!("{table_dir}/manifest/")
@@ -71,7 +70,7 @@ pub struct MitoTable<R: Region> {
manifest: TableManifest,
// guarded by `self.alter_lock`
table_info: ArcSwap<TableInfo>,
regions: HashMap<RegionNumber, R>,
regions: ArcSwap<HashMap<RegionNumber, R>>,
alter_lock: Mutex<()>,
}
@@ -89,9 +88,8 @@ impl<R: Region> Table for MitoTable<R> {
if request.columns_values.is_empty() {
return Ok(0);
}
let region = self
.regions
let regions = self.regions.load();
let region = regions
.get(&request.region_number)
.with_context(|| RegionNotFoundSnafu {
table: common_catalog::format_full_table_name(
@@ -145,14 +143,16 @@ impl<R: Region> Table for MitoTable<R> {
_limit: Option<usize>,
) -> TableResult<PhysicalPlanRef> {
let read_ctx = ReadContext::default();
let mut readers = Vec::with_capacity(self.regions.len());
let regions = self.regions.load();
let mut readers = Vec::with_capacity(regions.len());
let mut first_schema: Option<Arc<Schema>> = None;
let table_info = self.table_info.load();
// TODO(hl): Currently the API between frontend and datanode is under refactoring in
// https://github.com/GreptimeTeam/greptimedb/issues/597 . Once it's finished, query plan
// can carry filtered region info to avoid scanning all regions on datanode.
for region in self.regions.values() {
for region in regions.values() {
let snapshot = region
.snapshot(&read_ctx)
.map_err(BoxedError::new)
@@ -261,10 +261,11 @@ impl<R: Region> Table for MitoTable<R> {
if request.key_column_values.is_empty() {
return Ok(0);
}
let regions = self.regions.load();
let mut rows_deleted = 0;
// TODO(hl): Should be tracked by procedure.
// TODO(hl): Parse delete request into region->keys instead of delete in each region
for region in self.regions.values() {
for region in regions.values() {
let mut write_request = region.write_request();
let key_column_values = request.key_column_values.clone();
// Safety: key_column_values isn't empty.
@@ -301,8 +302,10 @@ impl<R: Region> Table for MitoTable<R> {
reason: FlushReason::Manually,
})
.unwrap_or_default();
let regions = self.regions.load();
if let Some(region_number) = region_number {
if let Some(region) = self.regions.get(&region_number) {
if let Some(region) = regions.get(&region_number) {
region
.flush(&flush_ctx)
.await
@@ -310,37 +313,19 @@ impl<R: Region> Table for MitoTable<R> {
.context(table_error::TableOperationSnafu)?;
}
} else {
futures::future::try_join_all(
self.regions.values().map(|region| region.flush(&flush_ctx)),
)
.await
.map_err(BoxedError::new)
.context(table_error::TableOperationSnafu)?;
futures::future::try_join_all(regions.values().map(|region| region.flush(&flush_ctx)))
.await
.map_err(BoxedError::new)
.context(table_error::TableOperationSnafu)?;
}
Ok(())
}
async fn close(&self) -> TableResult<()> {
futures::future::try_join_all(self.regions.values().map(|region| region.close()))
.await
.map_err(BoxedError::new)
.context(table_error::TableOperationSnafu)?;
Ok(())
}
async fn drop_regions(&self) -> TableResult<()> {
futures::future::try_join_all(self.regions.values().map(|region| region.drop_region()))
.await
.map_err(BoxedError::new)
.context(table_error::TableOperationSnafu)?;
Ok(())
}
fn region_stats(&self) -> TableResult<Vec<RegionStat>> {
Ok(self
.regions
let regions = self.regions.load();
Ok(regions
.values()
.map(|region| RegionStat {
region_id: region.id(),
@@ -349,8 +334,10 @@ impl<R: Region> Table for MitoTable<R> {
.collect())
}
fn contain_regions(&self, region: RegionNumber) -> TableResult<bool> {
Ok(self.regions.contains_key(&region))
fn contains_region(&self, region: RegionNumber) -> TableResult<bool> {
let regions = self.regions.load();
Ok(regions.contains_key(&region))
}
}
@@ -386,7 +373,7 @@ impl<R: Region> MitoTable<R> {
) -> Self {
Self {
table_info: ArcSwap::new(Arc::new(table_info)),
regions,
regions: ArcSwap::new(Arc::new(regions)),
manifest,
alter_lock: Mutex::new(()),
}
@@ -534,9 +521,48 @@ impl<R: Region> MitoTable<R> {
Ok(table_info)
}
/// Remove regions
/// Notes: Please release regions in StorageEngine.
pub async fn remove_regions(
&self,
region_numbers: &[RegionNumber],
) -> TableResult<HashMap<RegionNumber, R>> {
let mut removed = HashMap::with_capacity(region_numbers.len());
self.regions.rcu(|regions| {
removed.clear();
let mut regions = HashMap::clone(regions);
for region_number in region_numbers {
if let Some(region) = regions.remove(region_number) {
removed.insert(*region_number, region);
}
}
Arc::new(regions)
});
Ok(removed)
}
pub async fn drop_regions(&self, region_number: &[RegionNumber]) -> TableResult<()> {
let regions = self.remove_regions(region_number).await?;
futures::future::try_join_all(regions.values().map(|region| region.drop_region()))
.await
.map_err(BoxedError::new)
.context(table_error::TableOperationSnafu)?;
Ok(())
}
pub fn is_releasable(&self) -> bool {
let regions = self.regions.load();
regions.is_empty()
}
#[inline]
pub fn regions(&self) -> &HashMap<RegionNumber, R> {
&self.regions
pub fn region_ids(&self) -> Vec<RegionNumber> {
let regions = self.regions.load();
regions.iter().map(|(k, _)| *k).collect()
}
pub fn set_table_info(&self, table_info: TableInfo) {
@@ -560,7 +586,7 @@ impl<R: Region> MitoTable<R> {
table_version: TableVersion,
alter_op: &AlterOperation,
) -> TableResult<()> {
let regions = self.regions();
let regions = self.regions.load();
for region in regions.values() {
let region_meta = region.in_memory_metadata();
if u64::from(region_meta.version()) > table_version {
@@ -590,7 +616,7 @@ impl<R: Region> MitoTable<R> {
}
pub async fn load_region(&self, region_number: RegionNumber, _region: R) -> TableResult<()> {
let info = self.table_info.load_full();
let info = self.table_info.load();
// TODO(weny): Supports to load the region
warn!(

View File

@@ -193,10 +193,6 @@ impl Region for MockRegion {
Ok(())
}
async fn close(&self) -> Result<()> {
Ok(())
}
async fn drop_region(&self) -> Result<()> {
Ok(())
}
@@ -295,8 +291,14 @@ impl StorageEngine for MockEngine {
return Ok(None);
}
async fn close_region(&self, _ctx: &EngineContext, region: MockRegion) -> Result<()> {
region.close().await
async fn close_region(&self, _ctx: &EngineContext, name: &str) -> Result<()> {
let mut regions = self.regions.lock().unwrap();
if let Some(region) = regions.opened_regions.remove(name) {
regions.closed_regions.insert(name.to_string(), region);
}
Ok(())
}
async fn create_region(

View File

@@ -68,8 +68,8 @@ impl<S: LogStore> StorageEngine for EngineImpl<S> {
self.inner.open_region(name, opts).await
}
async fn close_region(&self, _ctx: &EngineContext, region: Self::Region) -> Result<()> {
region.close().await
async fn close_region(&self, _ctx: &EngineContext, name: &str) -> Result<()> {
self.inner.close_region(name).await
}
async fn create_region(
@@ -362,6 +362,16 @@ impl<S: LogStore> EngineInner<S> {
})
}
async fn close_region(&self, name: &str) -> Result<()> {
if let Some(region) = self.get_region(name) {
region.close().await?;
}
self.regions.remove(name);
Ok(())
}
async fn open_region(&self, name: &str, opts: &OpenOptions) -> Result<Option<RegionImpl<S>>> {
// We can wait until the state of the slot has been changed to ready, but this will
// make the code more complicate, so we just return the error here.
@@ -627,7 +637,7 @@ mod tests {
// Flush memtable to sst.
region.flush(&FlushContext::default()).await.unwrap();
engine.close_region(&ctx, region).await.unwrap();
engine.close_region(&ctx, region.name()).await.unwrap();
let dir_path = dir.path().join(region_name);

View File

@@ -126,11 +126,6 @@ impl<S: LogStore> Region for RegionImpl<S> {
self.inner.alter(request).await
}
async fn close(&self) -> Result<()> {
decrement_gauge!(crate::metrics::REGION_COUNT, 1.0);
self.inner.close().await
}
async fn drop_region(&self) -> Result<()> {
self.inner.drop_region().await
}
@@ -540,6 +535,11 @@ impl<S: LogStore> RegionImpl<S> {
pub async fn compact(&self, ctx: CompactContext) -> Result<()> {
self.inner.compact(ctx).await
}
pub async fn close(&self) -> Result<()> {
decrement_gauge!(crate::metrics::REGION_COUNT, 1.0);
self.inner.close().await
}
}
// Private methods for tests.

View File

@@ -41,11 +41,7 @@ pub trait StorageEngine: Send + Sync + Clone + 'static {
) -> Result<Option<Self::Region>, Self::Error>;
/// Closes given region.
async fn close_region(
&self,
ctx: &EngineContext,
region: Self::Region,
) -> Result<(), Self::Error>;
async fn close_region(&self, ctx: &EngineContext, name: &str) -> Result<(), Self::Error>;
/// Creates and returns the created region.
///

View File

@@ -73,8 +73,6 @@ pub trait Region: Send + Sync + Clone + std::fmt::Debug + 'static {
async fn alter(&self, request: AlterRequest) -> Result<(), Self::Error>;
async fn close(&self) -> Result<(), Self::Error>;
async fn drop_region(&self) -> Result<(), Self::Error>;
fn disk_usage_bytes(&self) -> u64;

View File

@@ -16,11 +16,13 @@ use std::fmt::{self, Display};
use std::sync::Arc;
use common_procedure::BoxedProcedure;
use store_api::storage::RegionId;
use store_api::storage::{RegionId, RegionNumber};
use crate::error::Result;
use crate::error::{self, Result};
use crate::metadata::TableId;
use crate::requests::{AlterTableRequest, CreateTableRequest, DropTableRequest, OpenTableRequest};
use crate::requests::{
AlterTableRequest, CloseTableRequest, CreateTableRequest, DropTableRequest, OpenTableRequest,
};
use crate::TableRef;
pub mod manager;
@@ -59,6 +61,19 @@ impl<'a> Display for TableReference<'a> {
}
}
/// CloseTableResult
///
/// Returns [`CloseTableResult::Released`] and closed region numbers if a table was removed
/// from the engine.
/// Returns [`CloseTableResult::PartialClosed`] and closed region numbers if only partial
/// regions were closed.
#[derive(Debug)]
pub enum CloseTableResult {
Released(Vec<RegionNumber>),
PartialClosed(Vec<RegionNumber>),
NotFound,
}
/// Table engine abstraction.
#[async_trait::async_trait]
pub trait TableEngine: Send + Sync {
@@ -104,7 +119,21 @@ pub trait TableEngine: Send + Sync {
/// Drops the given table. Return true if the table is dropped, or false if the table doesn't exist.
async fn drop_table(&self, ctx: &EngineContext, request: DropTableRequest) -> Result<bool>;
/// Close the table.
/// Closes the (partial) given table.
///
/// Removes a table from the engine if all regions are closed.
async fn close_table(
&self,
_ctx: &EngineContext,
_request: CloseTableRequest,
) -> Result<CloseTableResult> {
error::UnsupportedSnafu {
operation: "close_table",
}
.fail()?
}
/// Close the engine.
async fn close(&self) -> Result<()>;
}

View File

@@ -234,6 +234,26 @@ impl DropTableRequest {
}
}
/// Drop table request
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CloseTableRequest {
pub catalog_name: String,
pub schema_name: String,
pub table_name: String,
/// Do nothing if region_numbers is empty
pub region_numbers: Vec<RegionNumber>,
}
impl CloseTableRequest {
pub fn table_ref(&self) -> TableReference {
TableReference {
catalog: &self.catalog_name,
schema: &self.schema_name,
table: &self.table_name,
}
}
}
#[derive(Debug)]
pub struct InsertRequest {
pub catalog_name: String,

View File

@@ -108,12 +108,7 @@ pub trait Table: Send + Sync {
}
/// Close the table.
async fn close(&self) -> Result<()> {
Ok(())
}
/// Drop regions
async fn drop_regions(&self) -> Result<()> {
async fn close(&self, _regions: &[RegionNumber]) -> Result<()> {
Ok(())
}
@@ -126,7 +121,7 @@ pub trait Table: Send + Sync {
}
/// Return true if contains the region
fn contain_regions(&self, _region: RegionNumber) -> Result<bool> {
fn contains_region(&self, _region: RegionNumber) -> Result<bool> {
UnsupportedSnafu {
operation: "contain_region",
}