diff --git a/src/log-store/src/noop.rs b/src/log-store/src/noop.rs index 1b7ecda840..09916e0055 100644 --- a/src/log-store/src/noop.rs +++ b/src/log-store/src/noop.rs @@ -80,11 +80,11 @@ impl LogStore for NoopLogStore { ))))) } - async fn create_namespace(&mut self, _ns: &Self::Namespace) -> Result<()> { + async fn create_namespace(&self, _ns: &Self::Namespace) -> Result<()> { Ok(()) } - async fn delete_namespace(&mut self, _ns: &Self::Namespace) -> Result<()> { + async fn delete_namespace(&self, _ns: &Self::Namespace) -> Result<()> { Ok(()) } @@ -128,7 +128,7 @@ mod tests { #[tokio::test] async fn test_noop_logstore() { - let mut store = NoopLogStore::default(); + let store = NoopLogStore::default(); let e = store.entry("".as_bytes(), 1, NamespaceImpl::default()); store.append(e.clone()).await.unwrap(); store diff --git a/src/log-store/src/raft_engine/log_store.rs b/src/log-store/src/raft_engine/log_store.rs index c7218ac8c8..0ce32d6eb2 100644 --- a/src/log-store/src/raft_engine/log_store.rs +++ b/src/log-store/src/raft_engine/log_store.rs @@ -220,7 +220,7 @@ impl LogStore for RaftEngineLogStore { Ok(Box::pin(s)) } - async fn create_namespace(&mut self, ns: &Self::Namespace) -> Result<(), Self::Error> { + async fn create_namespace(&self, ns: &Self::Namespace) -> Result<(), Self::Error> { ensure!( ns.id != SYSTEM_NAMESPACE, IllegalNamespaceSnafu { ns: ns.id } @@ -237,7 +237,7 @@ impl LogStore for RaftEngineLogStore { Ok(()) } - async fn delete_namespace(&mut self, ns: &Self::Namespace) -> Result<(), Self::Error> { + async fn delete_namespace(&self, ns: &Self::Namespace) -> Result<(), Self::Error> { ensure!( ns.id != SYSTEM_NAMESPACE, IllegalNamespaceSnafu { ns: ns.id } @@ -343,7 +343,7 @@ mod tests { #[tokio::test] async fn test_manage_namespace() { let dir = create_temp_dir("raft-engine-logstore-test"); - let mut logstore = RaftEngineLogStore::try_new(LogConfig { + let logstore = RaftEngineLogStore::try_new(LogConfig { log_file_dir: dir.path().to_str().unwrap().to_string(), ..Default::default() }) diff --git a/src/mito/src/engine.rs b/src/mito/src/engine.rs index 8e8a19656f..8ba37a72d1 100644 --- a/src/mito/src/engine.rs +++ b/src/mito/src/engine.rs @@ -614,7 +614,7 @@ impl MitoEngineInner { // Close the table to close all regions. Closing a region is idempotent. if let Some((_, table)) = &removed_table { table - .close() + .drop_regions() .await .map_err(BoxedError::new) .context(table_error::TableOperationSnafu)?; diff --git a/src/mito/src/table.rs b/src/mito/src/table.rs index d7048607ba..ae573c5384 100644 --- a/src/mito/src/table.rs +++ b/src/mito/src/table.rs @@ -330,6 +330,14 @@ impl Table for MitoTable { Ok(()) } + async fn drop_regions(&self) -> TableResult<()> { + futures::future::try_join_all(self.regions.values().map(|region| region.drop_region())) + .await + .map_err(BoxedError::new) + .context(table_error::TableOperationSnafu)?; + Ok(()) + } + fn region_stats(&self) -> TableResult> { Ok(self .regions diff --git a/src/mito/src/table/test_util/mock_engine.rs b/src/mito/src/table/test_util/mock_engine.rs index a1dcedd7a2..70545c2964 100644 --- a/src/mito/src/table/test_util/mock_engine.rs +++ b/src/mito/src/table/test_util/mock_engine.rs @@ -197,6 +197,10 @@ impl Region for MockRegion { Ok(()) } + async fn drop_region(&self) -> Result<()> { + Ok(()) + } + fn disk_usage_bytes(&self) -> u64 { 0 } diff --git a/src/storage/src/engine.rs b/src/storage/src/engine.rs index ddf1269751..f57859b68b 100644 --- a/src/storage/src/engine.rs +++ b/src/storage/src/engine.rs @@ -81,8 +81,10 @@ impl StorageEngine for EngineImpl { self.inner.create_region(descriptor, opts).await } - async fn drop_region(&self, _ctx: &EngineContext, _region: Self::Region) -> Result<()> { - unimplemented!() + async fn drop_region(&self, _ctx: &EngineContext, region: Self::Region) -> Result<()> { + region.drop_region().await?; + self.inner.remove_reigon(region.name()); + Ok(()) } fn get_region(&self, _ctx: &EngineContext, name: &str) -> Result> { @@ -433,6 +435,10 @@ impl EngineInner { self.regions.get_region(name) } + fn remove_reigon(&self, name: &str) { + self.regions.remove(name) + } + async fn region_store_config( &self, parent_dir: &str, @@ -494,23 +500,35 @@ impl EngineInner { #[cfg(test)] mod tests { - use common_test_util::temp_dir::create_temp_dir; + use std::ffi::OsStr; + use std::path::Path; + + use common_test_util::temp_dir::{create_temp_dir, TempDir}; use datatypes::type_id::LogicalTypeId; + use datatypes::vectors::{Float32Vector, Int32Vector, TimestampMillisecondVector, VectorRef}; + use log_store::raft_engine::log_store::RaftEngineLogStore; use log_store::test_util::log_store_util; use object_store::services::Fs; - use store_api::storage::Region; + use store_api::storage::{FlushContext, Region, WriteContext, WriteRequest}; use super::*; use crate::compaction::noop::NoopCompactionScheduler; use crate::test_util::descriptor_util::RegionDescBuilder; - #[tokio::test] - async fn test_create_new_region() { - let log_file_dir = create_temp_dir("test_engine_wal"); + type TestEngine = EngineImpl; + type TestRegion = RegionImpl; + + async fn create_engine_and_region( + tmp_dir: &TempDir, + log_file_dir: &TempDir, + region_name: &str, + region_id: u64, + ctx: &EngineContext, + ) -> (TestEngine, TestRegion) { let log_file_dir_path = log_file_dir.path().to_str().unwrap(); let log_store = log_store_util::create_tmp_local_file_log_store(log_file_dir_path).await; - let dir = create_temp_dir("test_create_new_region"); - let store_dir = dir.path().to_string_lossy(); + + let store_dir = tmp_dir.path().to_string_lossy(); let mut builder = Fs::default(); builder.root(&store_dir); @@ -528,17 +546,40 @@ mod tests { ) .unwrap(); - let region_name = "region-0"; let desc = RegionDescBuilder::new(region_name) + .id(region_id) .push_key_column(("k1", LogicalTypeId::Int32, false)) .push_field_column(("v1", LogicalTypeId::Float32, true)) + .timestamp(("ts", LogicalTypeId::TimestampMillisecond, false)) .build(); - let ctx = EngineContext::default(); + let region = engine - .create_region(&ctx, desc, &CreateOptions::default()) + .create_region(ctx, desc, &CreateOptions::default()) .await .unwrap(); + (engine, region) + } + + fn parquet_file_num(path: &Path) -> usize { + path.read_dir() + .unwrap() + .filter_map(|entry| entry.ok()) + .filter(|entry| entry.path().extension() == Some(OsStr::new("parquet"))) + .count() + } + + #[tokio::test] + async fn test_create_new_region() { + let dir = create_temp_dir("test_create_region"); + let log_file_dir = create_temp_dir("test_engine_wal"); + + let region_name = "region-0"; + let region_id = 123456; + let ctx = EngineContext::default(); + + let (engine, region) = + create_engine_and_region(&dir, &log_file_dir, region_name, region_id, &ctx).await; assert_eq!(region_name, region.name()); let region2 = engine.get_region(&ctx, region_name).unwrap().unwrap(); @@ -546,4 +587,56 @@ mod tests { assert!(engine.get_region(&ctx, "no such region").unwrap().is_none()); } + + #[tokio::test] + async fn test_drop_region() { + common_telemetry::init_default_ut_logging(); + let dir = create_temp_dir("test_drop_region"); + let log_file_dir = create_temp_dir("test_engine_wal"); + + let region_name = "test_region"; + let region_id = 123456; + let ctx = EngineContext::default(); + + let (engine, region) = + create_engine_and_region(&dir, &log_file_dir, region_name, region_id, &ctx).await; + + assert_eq!(region_name, region.name()); + + let mut wb = region.write_request(); + let k1 = Arc::new(Int32Vector::from_slice([1, 2, 3])) as VectorRef; + let v1 = Arc::new(Float32Vector::from_slice([0.1, 0.2, 0.3])) as VectorRef; + let tsv = Arc::new(TimestampMillisecondVector::from_slice([0, 0, 0])) as VectorRef; + + let mut put_data = HashMap::with_capacity(4); + put_data.insert("k1".to_string(), k1); + put_data.insert("v1".to_string(), v1); + put_data.insert("ts".to_string(), tsv); + + wb.put(put_data).unwrap(); + region.write(&WriteContext::default(), wb).await.unwrap(); + + // Flush memtable to sst. + region.flush(&FlushContext::default()).await.unwrap(); + engine.close_region(&ctx, region).await.unwrap(); + + let dir_path = dir.path().join(region_name); + + assert_eq!(1, parquet_file_num(&dir_path)); + + { + let region = engine + .open_region(&ctx, region_name, &OpenOptions::default()) + .await + .unwrap() + .unwrap(); + + engine.drop_region(&ctx, region).await.unwrap(); + assert!(engine.get_region(&ctx, region_name).unwrap().is_none()); + } + + // Wait for gc + tokio::time::sleep(Duration::from_millis(60)).await; + assert_eq!(0, parquet_file_num(&dir_path)); + } } diff --git a/src/storage/src/error.rs b/src/storage/src/error.rs index 1a62cc872f..f6fd3b9361 100644 --- a/src/storage/src/error.rs +++ b/src/storage/src/error.rs @@ -240,6 +240,17 @@ pub enum Error { location: Location, }, + #[snafu(display( + "Failed to delete WAL namespace, region id: {}, source: {}", + region_id, + source + ))] + DeleteWalNamespace { + region_id: RegionId, + location: Location, + source: BoxedError, + }, + #[snafu(display( "Sequence of region should increase monotonically (should be {} < {})", prev, @@ -600,6 +611,7 @@ impl ErrorExt for Error { CreateDefault { source, .. } => source.status_code(), ConvertChunk { source, .. } => source.status_code(), MarkWalObsolete { source, .. } => source.status_code(), + DeleteWalNamespace { source, .. } => source.status_code(), DecodeParquetTimeRange { .. } => StatusCode::Unexpected, RateLimited { .. } | StopScheduler { .. } | CompactTaskCancel { .. } => { StatusCode::Internal diff --git a/src/storage/src/manifest/impl_.rs b/src/storage/src/manifest/impl_.rs index 8478148265..699775fb71 100644 --- a/src/storage/src/manifest/impl_.rs +++ b/src/storage/src/manifest/impl_.rs @@ -388,7 +388,7 @@ impl, M: MetaAction> ManifestImplInn // It happens when saving checkpoint successfully, but failed at saving checkpoint metadata(the "__last_checkpoint" file). // Then we try to use the old checkpoint and do the checkpoint next time. // If the old checkpoint was deleted, it's fine that we return the latest checkpoint. - // the only side effect is leaving some unused checkpoint checkpoint files, + // The only side effect is leaving some unused checkpoint files, // and they will be purged by gc task. warn!("The checkpoint manifest version {} in {} is greater than checkpoint metadata version {}.", self.store.path(), checkpoint.last_version(), version); diff --git a/src/storage/src/region.rs b/src/storage/src/region.rs index 461ec3a0c6..5914e86644 100644 --- a/src/storage/src/region.rs +++ b/src/storage/src/region.rs @@ -46,6 +46,7 @@ use crate::manifest::region::RegionManifest; use crate::memtable::MemtableBuilderRef; use crate::metadata::{RegionMetaImpl, RegionMetadata, RegionMetadataRef}; pub(crate) use crate::region::writer::schedule_compaction; +use crate::region::writer::DropContext; pub use crate::region::writer::{AlterContext, RegionWriter, RegionWriterRef, WriterContext}; use crate::schema::compat::CompatWrite; use crate::snapshot::SnapshotImpl; @@ -130,6 +131,10 @@ impl Region for RegionImpl { self.inner.close().await } + async fn drop_region(&self) -> Result<()> { + self.inner.drop_region().await + } + fn disk_usage_bytes(&self) -> u64 { let version = self.inner.version_control().current(); version @@ -675,6 +680,21 @@ impl RegionInner { self.manifest.stop().await } + async fn drop_region(&self) -> Result<()> { + logging::info!("Drop region {}, name: {}", self.shared.id, self.shared.name); + let drop_ctx = DropContext { + shared: &self.shared, + wal: &self.wal, + manifest: &self.manifest, + flush_scheduler: &self.flush_scheduler, + compaction_scheduler: &self.compaction_scheduler, + sst_layer: &self.sst_layer, + }; + + self.manifest.stop().await?; + self.writer.on_drop(drop_ctx).await + } + async fn flush(&self, ctx: &FlushContext) -> Result<()> { let writer_ctx = WriterContext { shared: &self.shared, diff --git a/src/storage/src/region/writer.rs b/src/storage/src/region/writer.rs index 991a11a571..52eb84cbba 100644 --- a/src/storage/src/region/writer.rs +++ b/src/storage/src/region/writer.rs @@ -33,6 +33,7 @@ use crate::error::{self, Result}; use crate::flush::{FlushHandle, FlushRequest, FlushSchedulerRef, FlushStrategyRef}; use crate::manifest::action::{ RawRegionMetadata, RegionChange, RegionEdit, RegionMetaAction, RegionMetaActionList, + RegionRemove, }; use crate::memtable::{Inserter, MemtableBuilderRef, MemtableId, MemtableRef}; use crate::metadata::RegionMetadataRef; @@ -270,7 +271,58 @@ impl RegionWriter { self.wait_flush().await?; - // TODO: canncel the compaction task + // TODO: cancel the compaction task + + Ok(()) + } + + pub async fn on_drop(&self, drop_ctx: DropContext<'_, S>) -> Result<()> { + // 1. Acquires the write lock. + // 2. Close writer reject any potential writing. + // 3. Waits or cancels the flush job. + // 4. Add `RegionMetaAction::Remove` to recover from manifest in case of failure. + // The main task is to restore the cleaning of sst files. If there is a failure + // in the previous stops, it can be restored through the `Procedure` framework. + // 5. Mark all data obsolete in the WAL. + // 6. Delete the namespace of the region from the WAL. + // 7. Mark all SSTs deleted. + let mut inner = self.inner.lock().await; + inner.mark_closed(); + + if let Some(handle) = inner.flush_handle.take() { + handle.wait().await?; + } + + let version_control = drop_ctx.version_control(); + + let _lock = self.version_mutex.lock().await; + let committed_sequence = version_control.committed_sequence(); + let current_version = version_control.current(); + + let mut action_list = + RegionMetaActionList::with_action(RegionMetaAction::Remove(RegionRemove { + region_id: drop_ctx.shared.id, + })); + + // Persist the meta action. + let prev_version = version_control.current_manifest_version(); + action_list.set_prev_version(prev_version); + + logging::debug!( + "Try to remove region {}, action_list: {:?}", + drop_ctx.shared.name(), + action_list + ); + + drop_ctx.manifest.update(action_list).await?; + + // Mark all data obsolete and delete the namespace in the WAL + drop_ctx.wal.obsolete(committed_sequence).await?; + drop_ctx.wal.delete_namespace().await?; + + // Mark all SSTs deleted + let files = current_version.ssts().mark_all_files_deleted(); + logging::debug!("Try to remove all SSTs {:?}", files); Ok(()) } @@ -354,6 +406,22 @@ impl<'a, S: LogStore> AlterContext<'a, S> { } } +pub struct DropContext<'a, S: LogStore> { + pub shared: &'a SharedDataRef, + pub wal: &'a Wal, + pub manifest: &'a RegionManifest, + pub flush_scheduler: &'a FlushSchedulerRef, + pub compaction_scheduler: &'a CompactionSchedulerRef, + pub sst_layer: &'a AccessLayerRef, +} + +impl<'a, S: LogStore> DropContext<'a, S> { + #[inline] + fn version_control(&self) -> &VersionControlRef { + &self.shared.version_control + } +} + #[derive(Debug)] struct WriterInner { memtable_builder: MemtableBuilderRef, diff --git a/src/storage/src/sst.rs b/src/storage/src/sst.rs index 1597c3e68f..e3737b81ed 100644 --- a/src/storage/src/sst.rs +++ b/src/storage/src/sst.rs @@ -121,6 +121,16 @@ impl LevelMetas { merged } + pub fn mark_all_files_deleted(&self) -> Vec { + self.levels().iter().fold(vec![], |mut files, level| { + files.extend(level.files().map(|f| { + f.mark_deleted(); + f.file_id() + })); + files + }) + } + pub fn levels(&self) -> &[LevelMeta] { &self.levels } diff --git a/src/storage/src/wal.rs b/src/storage/src/wal.rs index 9b86f022d4..80dc4183fd 100644 --- a/src/storage/src/wal.rs +++ b/src/storage/src/wal.rs @@ -26,8 +26,8 @@ use store_api::storage::{RegionId, SequenceNumber}; use crate::codec::{Decoder, Encoder}; use crate::error::{ - DecodeWalHeaderSnafu, EncodeWalHeaderSnafu, Error, MarkWalObsoleteSnafu, ReadWalSnafu, Result, - WalDataCorruptedSnafu, WriteWalSnafu, + DecodeWalHeaderSnafu, DeleteWalNamespaceSnafu, EncodeWalHeaderSnafu, Error, + MarkWalObsoleteSnafu, ReadWalSnafu, Result, WalDataCorruptedSnafu, WriteWalSnafu, }; use crate::proto::wal::{self, WalHeader}; use crate::write_batch::codec::{PayloadDecoder, PayloadEncoder}; @@ -74,6 +74,16 @@ impl Wal { }) } + pub async fn delete_namespace(&self) -> Result<()> { + self.store + .delete_namespace(&self.namespace) + .await + .map_err(BoxedError::new) + .context(DeleteWalNamespaceSnafu { + region_id: self.region_id, + }) + } + #[inline] pub fn region_id(&self) -> RegionId { self.region_id diff --git a/src/store-api/src/logstore.rs b/src/store-api/src/logstore.rs index 1cd4453122..459ca9f529 100644 --- a/src/store-api/src/logstore.rs +++ b/src/store-api/src/logstore.rs @@ -54,10 +54,10 @@ pub trait LogStore: Send + Sync + 'static + std::fmt::Debug { ) -> Result, Self::Error>; /// Create a new `Namespace`. - async fn create_namespace(&mut self, ns: &Self::Namespace) -> Result<(), Self::Error>; + async fn create_namespace(&self, ns: &Self::Namespace) -> Result<(), Self::Error>; /// Delete an existing `Namespace` with given ref. - async fn delete_namespace(&mut self, ns: &Self::Namespace) -> Result<(), Self::Error>; + async fn delete_namespace(&self, ns: &Self::Namespace) -> Result<(), Self::Error>; /// List all existing namespaces. async fn list_namespaces(&self) -> Result, Self::Error>; diff --git a/src/store-api/src/storage/region.rs b/src/store-api/src/storage/region.rs index 648b15a876..9968158feb 100644 --- a/src/store-api/src/storage/region.rs +++ b/src/store-api/src/storage/region.rs @@ -75,6 +75,8 @@ pub trait Region: Send + Sync + Clone + std::fmt::Debug + 'static { async fn close(&self) -> Result<(), Self::Error>; + async fn drop_region(&self) -> Result<(), Self::Error>; + fn disk_usage_bytes(&self) -> u64; fn region_stat(&self) -> RegionStat { diff --git a/src/table/src/table.rs b/src/table/src/table.rs index 5e96832078..1103ba82d7 100644 --- a/src/table/src/table.rs +++ b/src/table/src/table.rs @@ -112,6 +112,11 @@ pub trait Table: Send + Sync { Ok(()) } + /// Drop regions + async fn drop_regions(&self) -> Result<()> { + Ok(()) + } + /// Get region stats in this table. fn region_stats(&self) -> Result> { UnsupportedSnafu { diff --git a/tests/cases/distributed/common b/tests/cases/distributed/common deleted file mode 120000 index 2b0920287d..0000000000 --- a/tests/cases/distributed/common +++ /dev/null @@ -1 +0,0 @@ -../standalone/common \ No newline at end of file