diff --git a/src/datanode/src/heartbeat/handler/open_region.rs b/src/datanode/src/heartbeat/handler/open_region.rs index a1172b68cd..3c2273880b 100644 --- a/src/datanode/src/heartbeat/handler/open_region.rs +++ b/src/datanode/src/heartbeat/handler/open_region.rs @@ -134,7 +134,7 @@ impl OpenRegionHandler { for r in region_numbers { let region_exist = table - .contain_regions(*r) + .contains_region(*r) .with_context(|_| error::CheckRegionSnafu { table_name: format_full_table_name( catalog_name, diff --git a/src/file-table-engine/src/engine/immutable.rs b/src/file-table-engine/src/engine/immutable.rs index 0e1176ee15..1814a33066 100644 --- a/src/file-table-engine/src/engine/immutable.rs +++ b/src/file-table-engine/src/engine/immutable.rs @@ -402,7 +402,7 @@ impl EngineInner { let tables = self.tables.read().unwrap().clone(); - futures::future::try_join_all(tables.values().map(|t| t.close())) + futures::future::try_join_all(tables.values().map(|t| t.close(&[]))) .await .map_err(BoxedError::new) .context(table_error::TableOperationSnafu)?; @@ -435,8 +435,9 @@ impl EngineInner { let _lock = self.table_mutex.lock().await; if let Some(table) = self.get_table_by_full_name(&full_name) { + let regions = Vec::new(); table - .close() + .close(®ions) .await .map_err(BoxedError::new) .context(table_error::TableOperationSnafu)?; diff --git a/src/file-table-engine/src/table/immutable.rs b/src/file-table-engine/src/table/immutable.rs index b0bcf29d4a..3f80253b0f 100644 --- a/src/file-table-engine/src/table/immutable.rs +++ b/src/file-table-engine/src/table/immutable.rs @@ -105,7 +105,7 @@ impl Table for ImmutableFileTable { Ok(()) } - async fn close(&self) -> TableResult<()> { + async fn close(&self, _region_number: &[RegionNumber]) -> TableResult<()> { Ok(()) } } diff --git a/src/mito/src/engine.rs b/src/mito/src/engine.rs index 8ba37a72d1..a13877053c 100644 --- a/src/mito/src/engine.rs +++ b/src/mito/src/engine.rs @@ -37,11 +37,13 @@ use store_api::storage::{ RowKeyDescriptorBuilder, StorageEngine, }; use table::engine::{ - region_name, table_dir, EngineContext, TableEngine, TableEngineProcedure, TableReference, + region_name, table_dir, CloseTableResult, EngineContext, TableEngine, TableEngineProcedure, + TableReference, }; use table::metadata::{TableId, TableInfo, TableVersion}; use table::requests::{ - AlterKind, AlterTableRequest, CreateTableRequest, DropTableRequest, OpenTableRequest, + AlterKind, AlterTableRequest, CloseTableRequest, CreateTableRequest, DropTableRequest, + OpenTableRequest, }; use table::{error as table_error, Result as TableResult, Table, TableRef}; @@ -193,6 +195,14 @@ impl TableEngine for MitoEngine { self.inner.drop_table(request).await } + async fn close_table( + &self, + _ctx: &EngineContext, + request: CloseTableRequest, + ) -> TableResult { + self.inner.close_table(request).await + } + async fn close(&self) -> TableResult<()> { self.inner.close().await } @@ -386,7 +396,7 @@ fn validate_create_table_request(request: &CreateTableRequest) -> Result<()> { fn all_regions_open(table: TableRef, regions: &[RegionNumber]) -> TableResult { for r in regions { - let region_exist = table.contain_regions(*r)?; + let region_exist = table.contains_region(*r)?; if !region_exist { return Ok(false); } @@ -527,7 +537,7 @@ impl MitoEngineInner { // TODO(weny): Returns an error earlier if the target region does not exist in the meta. for region_number in region_numbers { - if table.contain_regions(*region_number)? { + if table.contains_region(*region_number)? { continue; } @@ -610,15 +620,28 @@ impl MitoEngineInner { let _lock = self.table_mutex.lock(table_ref.to_string()).await; let removed_table = self.tables.remove(&table_ref.to_string()); - // Close the table to close all regions. Closing a region is idempotent. if let Some((_, table)) = &removed_table { + let regions = table.region_ids(); + let table_id = table.table_info().ident.table_id; + table - .drop_regions() + .drop_regions(®ions) .await .map_err(BoxedError::new) .context(table_error::TableOperationSnafu)?; + let ctx = StorageEngineContext::default(); + + // Releases regions in storage engine + for region_number in regions { + self.storage_engine + .close_region(&ctx, ®ion_name(table_id, region_number)) + .await + .map_err(BoxedError::new) + .context(table_error::TableOperationSnafu)?; + } + Ok(true) } else { Ok(false) @@ -659,7 +682,7 @@ impl MitoEngineInner { futures::future::try_join_all( self.tables .iter() - .map(|item| close_table(self.table_mutex.clone(), item.value().clone())), + .map(|item| self.close_table_inner(item.value().clone(), None)), ) .await .map_err(BoxedError::new) @@ -673,17 +696,55 @@ impl MitoEngineInner { Ok(()) } -} -async fn close_table(lock: Arc>, table: TableRef) -> TableResult<()> { - let info = table.table_info(); - let table_ref = TableReference { - catalog: &info.catalog_name, - schema: &info.schema_name, - table: &info.name, - }; - let _lock = lock.lock(table_ref.to_string()).await; - table.close().await + async fn close_table(&self, request: CloseTableRequest) -> TableResult { + let table_ref = request.table_ref(); + if let Some(table) = self.get_mito_table(&table_ref) { + return self + .close_table_inner(table, Some(&request.region_numbers)) + .await; + } + // table doesn't exist + Ok(CloseTableResult::NotFound) + } + + async fn close_table_inner( + &self, + table: Arc>, + regions: Option<&[RegionNumber]>, + ) -> TableResult { + let info = table.table_info(); + let table_ref = TableReference { + catalog: &info.catalog_name, + schema: &info.schema_name, + table: &info.name, + }; + let table_id = info.ident.table_id; + let _lock = self.table_mutex.lock(table_ref.to_string()).await; + + let all_regions = table.region_ids(); + let regions = regions.unwrap_or(&all_regions); + let removed = table.remove_regions(regions).await?; + let removed_regions = removed.keys().cloned().collect::>(); + let ctx = StorageEngineContext::default(); + + // Releases regions in storage engine + for region_number in regions { + self.storage_engine + .close_region(&ctx, ®ion_name(table_id, *region_number)) + .await + .map_err(BoxedError::new) + .context(table_error::TableOperationSnafu)?; + } + + if table.is_releasable() { + self.tables.remove(&table_ref.to_string()); + return Ok(CloseTableResult::Released(removed_regions)); + } + + // Partial closed + Ok(CloseTableResult::PartialClosed(removed_regions)) + } } impl MitoEngineInner { diff --git a/src/mito/src/table.rs b/src/mito/src/table.rs index 6e65ac70e3..a4c2b502e5 100644 --- a/src/mito/src/table.rs +++ b/src/mito/src/table.rs @@ -60,7 +60,6 @@ use crate::error::{ }; use crate::manifest::action::*; use crate::manifest::TableManifest; - #[inline] fn table_manifest_dir(table_dir: &str) -> String { format!("{table_dir}/manifest/") @@ -71,7 +70,7 @@ pub struct MitoTable { manifest: TableManifest, // guarded by `self.alter_lock` table_info: ArcSwap, - regions: HashMap, + regions: ArcSwap>, alter_lock: Mutex<()>, } @@ -89,9 +88,8 @@ impl Table for MitoTable { if request.columns_values.is_empty() { return Ok(0); } - - let region = self - .regions + let regions = self.regions.load(); + let region = regions .get(&request.region_number) .with_context(|| RegionNotFoundSnafu { table: common_catalog::format_full_table_name( @@ -145,14 +143,16 @@ impl Table for MitoTable { _limit: Option, ) -> TableResult { let read_ctx = ReadContext::default(); - let mut readers = Vec::with_capacity(self.regions.len()); + let regions = self.regions.load(); + + let mut readers = Vec::with_capacity(regions.len()); let mut first_schema: Option> = None; let table_info = self.table_info.load(); // TODO(hl): Currently the API between frontend and datanode is under refactoring in // https://github.com/GreptimeTeam/greptimedb/issues/597 . Once it's finished, query plan // can carry filtered region info to avoid scanning all regions on datanode. - for region in self.regions.values() { + for region in regions.values() { let snapshot = region .snapshot(&read_ctx) .map_err(BoxedError::new) @@ -261,10 +261,11 @@ impl Table for MitoTable { if request.key_column_values.is_empty() { return Ok(0); } + let regions = self.regions.load(); let mut rows_deleted = 0; // TODO(hl): Should be tracked by procedure. // TODO(hl): Parse delete request into region->keys instead of delete in each region - for region in self.regions.values() { + for region in regions.values() { let mut write_request = region.write_request(); let key_column_values = request.key_column_values.clone(); // Safety: key_column_values isn't empty. @@ -301,8 +302,10 @@ impl Table for MitoTable { reason: FlushReason::Manually, }) .unwrap_or_default(); + let regions = self.regions.load(); + if let Some(region_number) = region_number { - if let Some(region) = self.regions.get(®ion_number) { + if let Some(region) = regions.get(®ion_number) { region .flush(&flush_ctx) .await @@ -310,37 +313,19 @@ impl Table for MitoTable { .context(table_error::TableOperationSnafu)?; } } else { - futures::future::try_join_all( - self.regions.values().map(|region| region.flush(&flush_ctx)), - ) - .await - .map_err(BoxedError::new) - .context(table_error::TableOperationSnafu)?; + futures::future::try_join_all(regions.values().map(|region| region.flush(&flush_ctx))) + .await + .map_err(BoxedError::new) + .context(table_error::TableOperationSnafu)?; } Ok(()) } - async fn close(&self) -> TableResult<()> { - futures::future::try_join_all(self.regions.values().map(|region| region.close())) - .await - .map_err(BoxedError::new) - .context(table_error::TableOperationSnafu)?; - - Ok(()) - } - - async fn drop_regions(&self) -> TableResult<()> { - futures::future::try_join_all(self.regions.values().map(|region| region.drop_region())) - .await - .map_err(BoxedError::new) - .context(table_error::TableOperationSnafu)?; - Ok(()) - } - fn region_stats(&self) -> TableResult> { - Ok(self - .regions + let regions = self.regions.load(); + + Ok(regions .values() .map(|region| RegionStat { region_id: region.id(), @@ -349,8 +334,10 @@ impl Table for MitoTable { .collect()) } - fn contain_regions(&self, region: RegionNumber) -> TableResult { - Ok(self.regions.contains_key(®ion)) + fn contains_region(&self, region: RegionNumber) -> TableResult { + let regions = self.regions.load(); + + Ok(regions.contains_key(®ion)) } } @@ -386,7 +373,7 @@ impl MitoTable { ) -> Self { Self { table_info: ArcSwap::new(Arc::new(table_info)), - regions, + regions: ArcSwap::new(Arc::new(regions)), manifest, alter_lock: Mutex::new(()), } @@ -534,9 +521,48 @@ impl MitoTable { Ok(table_info) } + /// Remove regions + /// Notes: Please release regions in StorageEngine. + pub async fn remove_regions( + &self, + region_numbers: &[RegionNumber], + ) -> TableResult> { + let mut removed = HashMap::with_capacity(region_numbers.len()); + self.regions.rcu(|regions| { + removed.clear(); + let mut regions = HashMap::clone(regions); + for region_number in region_numbers { + if let Some(region) = regions.remove(region_number) { + removed.insert(*region_number, region); + } + } + + Arc::new(regions) + }); + + Ok(removed) + } + + pub async fn drop_regions(&self, region_number: &[RegionNumber]) -> TableResult<()> { + let regions = self.remove_regions(region_number).await?; + + futures::future::try_join_all(regions.values().map(|region| region.drop_region())) + .await + .map_err(BoxedError::new) + .context(table_error::TableOperationSnafu)?; + Ok(()) + } + + pub fn is_releasable(&self) -> bool { + let regions = self.regions.load(); + + regions.is_empty() + } + #[inline] - pub fn regions(&self) -> &HashMap { - &self.regions + pub fn region_ids(&self) -> Vec { + let regions = self.regions.load(); + regions.iter().map(|(k, _)| *k).collect() } pub fn set_table_info(&self, table_info: TableInfo) { @@ -560,7 +586,7 @@ impl MitoTable { table_version: TableVersion, alter_op: &AlterOperation, ) -> TableResult<()> { - let regions = self.regions(); + let regions = self.regions.load(); for region in regions.values() { let region_meta = region.in_memory_metadata(); if u64::from(region_meta.version()) > table_version { @@ -590,7 +616,7 @@ impl MitoTable { } pub async fn load_region(&self, region_number: RegionNumber, _region: R) -> TableResult<()> { - let info = self.table_info.load_full(); + let info = self.table_info.load(); // TODO(weny): Supports to load the region warn!( diff --git a/src/mito/src/table/test_util/mock_engine.rs b/src/mito/src/table/test_util/mock_engine.rs index 70545c2964..5f776a71cb 100644 --- a/src/mito/src/table/test_util/mock_engine.rs +++ b/src/mito/src/table/test_util/mock_engine.rs @@ -193,10 +193,6 @@ impl Region for MockRegion { Ok(()) } - async fn close(&self) -> Result<()> { - Ok(()) - } - async fn drop_region(&self) -> Result<()> { Ok(()) } @@ -295,8 +291,14 @@ impl StorageEngine for MockEngine { return Ok(None); } - async fn close_region(&self, _ctx: &EngineContext, region: MockRegion) -> Result<()> { - region.close().await + async fn close_region(&self, _ctx: &EngineContext, name: &str) -> Result<()> { + let mut regions = self.regions.lock().unwrap(); + + if let Some(region) = regions.opened_regions.remove(name) { + regions.closed_regions.insert(name.to_string(), region); + } + + Ok(()) } async fn create_region( diff --git a/src/storage/src/engine.rs b/src/storage/src/engine.rs index 68c222b7b6..825b12cb48 100644 --- a/src/storage/src/engine.rs +++ b/src/storage/src/engine.rs @@ -68,8 +68,8 @@ impl StorageEngine for EngineImpl { self.inner.open_region(name, opts).await } - async fn close_region(&self, _ctx: &EngineContext, region: Self::Region) -> Result<()> { - region.close().await + async fn close_region(&self, _ctx: &EngineContext, name: &str) -> Result<()> { + self.inner.close_region(name).await } async fn create_region( @@ -362,6 +362,16 @@ impl EngineInner { }) } + async fn close_region(&self, name: &str) -> Result<()> { + if let Some(region) = self.get_region(name) { + region.close().await?; + } + + self.regions.remove(name); + + Ok(()) + } + async fn open_region(&self, name: &str, opts: &OpenOptions) -> Result>> { // We can wait until the state of the slot has been changed to ready, but this will // make the code more complicate, so we just return the error here. @@ -627,7 +637,7 @@ mod tests { // Flush memtable to sst. region.flush(&FlushContext::default()).await.unwrap(); - engine.close_region(&ctx, region).await.unwrap(); + engine.close_region(&ctx, region.name()).await.unwrap(); let dir_path = dir.path().join(region_name); diff --git a/src/storage/src/region.rs b/src/storage/src/region.rs index f8f6f7d0b8..23d40be669 100644 --- a/src/storage/src/region.rs +++ b/src/storage/src/region.rs @@ -126,11 +126,6 @@ impl Region for RegionImpl { self.inner.alter(request).await } - async fn close(&self) -> Result<()> { - decrement_gauge!(crate::metrics::REGION_COUNT, 1.0); - self.inner.close().await - } - async fn drop_region(&self) -> Result<()> { self.inner.drop_region().await } @@ -540,6 +535,11 @@ impl RegionImpl { pub async fn compact(&self, ctx: CompactContext) -> Result<()> { self.inner.compact(ctx).await } + + pub async fn close(&self) -> Result<()> { + decrement_gauge!(crate::metrics::REGION_COUNT, 1.0); + self.inner.close().await + } } // Private methods for tests. diff --git a/src/store-api/src/storage/engine.rs b/src/store-api/src/storage/engine.rs index fb055fa5b5..a4202cd0e8 100644 --- a/src/store-api/src/storage/engine.rs +++ b/src/store-api/src/storage/engine.rs @@ -41,11 +41,7 @@ pub trait StorageEngine: Send + Sync + Clone + 'static { ) -> Result, Self::Error>; /// Closes given region. - async fn close_region( - &self, - ctx: &EngineContext, - region: Self::Region, - ) -> Result<(), Self::Error>; + async fn close_region(&self, ctx: &EngineContext, name: &str) -> Result<(), Self::Error>; /// Creates and returns the created region. /// diff --git a/src/store-api/src/storage/region.rs b/src/store-api/src/storage/region.rs index 61c05f750b..9890695386 100644 --- a/src/store-api/src/storage/region.rs +++ b/src/store-api/src/storage/region.rs @@ -73,8 +73,6 @@ pub trait Region: Send + Sync + Clone + std::fmt::Debug + 'static { async fn alter(&self, request: AlterRequest) -> Result<(), Self::Error>; - async fn close(&self) -> Result<(), Self::Error>; - async fn drop_region(&self) -> Result<(), Self::Error>; fn disk_usage_bytes(&self) -> u64; diff --git a/src/table/src/engine.rs b/src/table/src/engine.rs index 07fcaf5a4f..2987fc2ed1 100644 --- a/src/table/src/engine.rs +++ b/src/table/src/engine.rs @@ -16,11 +16,13 @@ use std::fmt::{self, Display}; use std::sync::Arc; use common_procedure::BoxedProcedure; -use store_api::storage::RegionId; +use store_api::storage::{RegionId, RegionNumber}; -use crate::error::Result; +use crate::error::{self, Result}; use crate::metadata::TableId; -use crate::requests::{AlterTableRequest, CreateTableRequest, DropTableRequest, OpenTableRequest}; +use crate::requests::{ + AlterTableRequest, CloseTableRequest, CreateTableRequest, DropTableRequest, OpenTableRequest, +}; use crate::TableRef; pub mod manager; @@ -59,6 +61,19 @@ impl<'a> Display for TableReference<'a> { } } +/// CloseTableResult +/// +/// Returns [`CloseTableResult::Released`] and closed region numbers if a table was removed +/// from the engine. +/// Returns [`CloseTableResult::PartialClosed`] and closed region numbers if only partial +/// regions were closed. +#[derive(Debug)] +pub enum CloseTableResult { + Released(Vec), + PartialClosed(Vec), + NotFound, +} + /// Table engine abstraction. #[async_trait::async_trait] pub trait TableEngine: Send + Sync { @@ -104,7 +119,21 @@ pub trait TableEngine: Send + Sync { /// Drops the given table. Return true if the table is dropped, or false if the table doesn't exist. async fn drop_table(&self, ctx: &EngineContext, request: DropTableRequest) -> Result; - /// Close the table. + /// Closes the (partial) given table. + /// + /// Removes a table from the engine if all regions are closed. + async fn close_table( + &self, + _ctx: &EngineContext, + _request: CloseTableRequest, + ) -> Result { + error::UnsupportedSnafu { + operation: "close_table", + } + .fail()? + } + + /// Close the engine. async fn close(&self) -> Result<()>; } diff --git a/src/table/src/requests.rs b/src/table/src/requests.rs index a383148a79..b7ed352501 100644 --- a/src/table/src/requests.rs +++ b/src/table/src/requests.rs @@ -234,6 +234,26 @@ impl DropTableRequest { } } +/// Drop table request +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct CloseTableRequest { + pub catalog_name: String, + pub schema_name: String, + pub table_name: String, + /// Do nothing if region_numbers is empty + pub region_numbers: Vec, +} + +impl CloseTableRequest { + pub fn table_ref(&self) -> TableReference { + TableReference { + catalog: &self.catalog_name, + schema: &self.schema_name, + table: &self.table_name, + } + } +} + #[derive(Debug)] pub struct InsertRequest { pub catalog_name: String, diff --git a/src/table/src/table.rs b/src/table/src/table.rs index 1103ba82d7..b2bd036f57 100644 --- a/src/table/src/table.rs +++ b/src/table/src/table.rs @@ -108,12 +108,7 @@ pub trait Table: Send + Sync { } /// Close the table. - async fn close(&self) -> Result<()> { - Ok(()) - } - - /// Drop regions - async fn drop_regions(&self) -> Result<()> { + async fn close(&self, _regions: &[RegionNumber]) -> Result<()> { Ok(()) } @@ -126,7 +121,7 @@ pub trait Table: Send + Sync { } /// Return true if contains the region - fn contain_regions(&self, _region: RegionNumber) -> Result { + fn contains_region(&self, _region: RegionNumber) -> Result { UnsupportedSnafu { operation: "contain_region", }