From cf7e8c9142c107efbbad863b742dc49a7b006169 Mon Sep 17 00:00:00 2001 From: Vanish Date: Fri, 4 Aug 2023 20:26:25 +0800 Subject: [PATCH] feat: truncate region (#2097) * feat: impl truncate region * test: test truncate region * chore: typo * refactor: table truncate * chore: remove useless changes * chore: reset version * fix: wait for flush task to complete * fix: clippy * chore: remove useless changes * CR Co-authored-by: Yingwen * Update src/storage/src/engine.rs Co-authored-by: Yingwen * Update src/storage/src/engine.rs Co-authored-by: Yingwen * Update src/storage/src/region.rs Co-authored-by: Yingwen * Update src/storage/src/region/tests/truncate.rs Co-authored-by: Yingwen * Update src/storage/src/region/tests/truncate.rs Co-authored-by: Yingwen * Update src/storage/src/region/writer.rs Co-authored-by: Yingwen * CR * Update src/storage/src/engine.rs Co-authored-by: Yingwen * Update src/storage/src/manifest/region.rs Co-authored-by: Yingwen --------- Co-authored-by: Yingwen --- src/catalog/src/remote/mock.rs | 9 + src/file-table-engine/src/engine/immutable.rs | 12 +- src/mito/src/engine.rs | 25 ++ src/mito/src/engine/tests.rs | 62 ++++- src/mito/src/table.rs | 9 + src/mito/src/table/test_util.rs | 12 +- src/mito/src/table/test_util/mock_engine.rs | 4 + src/storage/src/engine.rs | 51 +++- src/storage/src/manifest/action.rs | 7 + src/storage/src/manifest/region.rs | 91 +++++++ src/storage/src/manifest/test_utils.rs | 7 + src/storage/src/region.rs | 49 +++- src/storage/src/region/tests.rs | 1 + src/storage/src/region/tests/truncate.rs | 242 ++++++++++++++++++ src/storage/src/region/writer.rs | 69 ++++- src/storage/src/sst.rs | 4 + src/storage/src/version.rs | 24 ++ src/store-api/src/storage/region.rs | 2 + src/table/src/engine.rs | 7 + src/table/src/table.rs | 7 + src/table/src/test_util/mock_engine.rs | 12 +- 21 files changed, 695 insertions(+), 11 deletions(-) create mode 100644 src/storage/src/region/tests/truncate.rs diff --git a/src/catalog/src/remote/mock.rs b/src/catalog/src/remote/mock.rs index b321f8818e..2f98ea018b 100644 --- a/src/catalog/src/remote/mock.rs +++ b/src/catalog/src/remote/mock.rs @@ -23,6 +23,7 @@ use table::engine::{CloseTableResult, EngineContext, TableEngine}; use table::metadata::TableId; use table::requests::{ AlterTableRequest, CloseTableRequest, CreateTableRequest, DropTableRequest, OpenTableRequest, + TruncateTableRequest, }; use table::test_util::MemTable; use table::TableRef; @@ -116,4 +117,12 @@ impl TableEngine for MockTableEngine { async fn close(&self) -> table::Result<()> { Ok(()) } + + async fn truncate_table( + &self, + _ctx: &EngineContext, + _request: TruncateTableRequest, + ) -> table::Result { + Ok(true) + } } diff --git a/src/file-table-engine/src/engine/immutable.rs b/src/file-table-engine/src/engine/immutable.rs index ed51ce770e..f3e97854ca 100644 --- a/src/file-table-engine/src/engine/immutable.rs +++ b/src/file-table-engine/src/engine/immutable.rs @@ -26,7 +26,9 @@ use snafu::ResultExt; use table::engine::{table_dir, EngineContext, TableEngine, TableEngineProcedure, TableReference}; use table::error::TableOperationSnafu; use table::metadata::{TableId, TableInfo, TableInfoBuilder, TableMetaBuilder, TableType}; -use table::requests::{AlterTableRequest, CreateTableRequest, DropTableRequest, OpenTableRequest}; +use table::requests::{ + AlterTableRequest, CreateTableRequest, DropTableRequest, OpenTableRequest, TruncateTableRequest, +}; use table::{error as table_error, Result as TableResult, Table, TableRef}; use tokio::sync::Mutex; @@ -111,6 +113,14 @@ impl TableEngine for ImmutableFileTableEngine { async fn close(&self) -> TableResult<()> { self.inner.close().await } + + async fn truncate_table( + &self, + _ctx: &EngineContext, + _request: TruncateTableRequest, + ) -> TableResult { + Ok(true) + } } #[async_trait] diff --git a/src/mito/src/engine.rs b/src/mito/src/engine.rs index 5198486159..5e83015e09 100644 --- a/src/mito/src/engine.rs +++ b/src/mito/src/engine.rs @@ -43,6 +43,7 @@ use table::engine::{ use table::metadata::{TableId, TableInfo, TableVersion}; use table::requests::{ AlterTableRequest, CloseTableRequest, CreateTableRequest, DropTableRequest, OpenTableRequest, + TruncateTableRequest, }; use table::{error as table_error, Result as TableResult, Table, TableRef}; @@ -184,6 +185,14 @@ impl TableEngine for MitoEngine { async fn close(&self) -> TableResult<()> { self.inner.close().await } + + async fn truncate_table( + &self, + _ctx: &EngineContext, + request: TruncateTableRequest, + ) -> TableResult { + self.inner.truncate_table(request).await + } } impl TableEngineProcedure for MitoEngine { @@ -704,6 +713,22 @@ impl MitoEngineInner { // Partial closed Ok(CloseTableResult::PartialClosed(removed_regions)) } + + async fn truncate_table(&self, request: TruncateTableRequest) -> TableResult { + let _lock = self.table_mutex.lock(request.table_id).await; + + let table_id = request.table_id; + if let Some(table) = self.get_mito_table(table_id) { + table + .truncate() + .await + .map_err(BoxedError::new) + .context(table_error::TableOperationSnafu)?; + Ok(true) + } else { + Ok(false) + } + } } impl MitoEngineInner { diff --git a/src/mito/src/engine/tests.rs b/src/mito/src/engine/tests.rs index c924a97cc9..6be3715ae6 100644 --- a/src/mito/src/engine/tests.rs +++ b/src/mito/src/engine/tests.rs @@ -38,7 +38,7 @@ use table::Table; use super::*; use crate::table::test_util::{ - self, new_insert_request, setup_table, TestEngineComponents, TABLE_NAME, + self, new_insert_request, new_truncate_request, setup_table, TestEngineComponents, TABLE_NAME, }; pub fn has_parquet_file(sst_dir: &str) -> bool { @@ -933,3 +933,63 @@ async fn test_flush_table_with_region_id() { assert!(has_parquet_file(®ion_dir)); } + +#[tokio::test] +async fn test_truncate_table() { + common_telemetry::init_default_ut_logging(); + let ctx = EngineContext::default(); + let TestEngineComponents { + table_engine, + table_ref: table, + dir: _dir, + .. + } = test_util::setup_test_engine_and_table().await; + + let hosts: VectorRef = Arc::new(StringVector::from(vec!["host1", "host2", "host3", "host4"])); + let cpus: VectorRef = Arc::new(Float64Vector::from_vec(vec![1.0, 2.0, 3.0, 4.0])); + let memories: VectorRef = Arc::new(Float64Vector::from_vec(vec![1.0, 2.0, 3.0, 4.0])); + let tss: VectorRef = Arc::new(TimestampMillisecondVector::from_vec(vec![1, 2, 2, 1])); + let columns_values = HashMap::from([ + ("host".to_string(), hosts.clone()), + ("cpu".to_string(), cpus.clone()), + ("memory".to_string(), memories.clone()), + ("ts".to_string(), tss.clone()), + ]); + + // Insert data. + let insert_req = new_insert_request("demo".to_string(), columns_values.clone()); + assert_eq!(4, table.insert(insert_req).await.unwrap()); + + // truncate table. + let truncate_req = new_truncate_request(); + let res = table_engine + .truncate_table(&ctx, truncate_req) + .await + .unwrap(); + assert!(res); + + // Verify table is empty. + let stream = table.scan_to_stream(ScanRequest::default()).await.unwrap(); + let batches = util::collect(stream).await.unwrap(); + assert!(batches.is_empty()); + + // Validate the data insertion again. + let insert_req = new_insert_request("demo".to_string(), columns_values); + assert_eq!(4, table.insert(insert_req).await.unwrap()); + + let stream = table.scan_to_stream(ScanRequest::default()).await.unwrap(); + let batches = util::collect_batches(stream).await.unwrap(); + + assert_eq!( + batches.pretty_print().unwrap(), + "\ ++-------+-----+--------+-------------------------+ +| host | cpu | memory | ts | ++-------+-----+--------+-------------------------+ +| host1 | 1.0 | 1.0 | 1970-01-01T00:00:00.001 | +| host2 | 2.0 | 2.0 | 1970-01-01T00:00:00.002 | +| host3 | 3.0 | 3.0 | 1970-01-01T00:00:00.002 | +| host4 | 4.0 | 4.0 | 1970-01-01T00:00:00.001 | ++-------+-----+--------+-------------------------+" + ); +} diff --git a/src/mito/src/table.rs b/src/mito/src/table.rs index 4d8a3133f2..80fbc19d4e 100644 --- a/src/mito/src/table.rs +++ b/src/mito/src/table.rs @@ -360,6 +360,15 @@ impl Table for MitoTable { Ok(()) } + async fn truncate(&self) -> TableResult<()> { + let regions = self.regions.load(); + let _ = futures::future::try_join_all(regions.values().map(|region| region.truncate())) + .await + .map_err(BoxedError::new) + .context(TableOperationSnafu)?; + Ok(()) + } + fn region_stats(&self) -> TableResult> { let regions = self.regions.load(); diff --git a/src/mito/src/table/test_util.rs b/src/mito/src/table/test_util.rs index e439973e13..3a2d2e8c31 100644 --- a/src/mito/src/table/test_util.rs +++ b/src/mito/src/table/test_util.rs @@ -30,7 +30,8 @@ use storage::EngineImpl; use table::engine::{EngineContext, TableEngine}; use table::metadata::{TableId, TableInfo, TableInfoBuilder, TableMetaBuilder, TableType}; use table::requests::{ - AlterKind, AlterTableRequest, CreateTableRequest, DropTableRequest, InsertRequest, TableOptions, + AlterKind, AlterTableRequest, CreateTableRequest, DropTableRequest, InsertRequest, + TableOptions, TruncateTableRequest, }; use table::{Table, TableRef}; @@ -142,6 +143,15 @@ pub fn new_drop_request() -> DropTableRequest { } } +pub fn new_truncate_request() -> TruncateTableRequest { + TruncateTableRequest { + catalog_name: DEFAULT_CATALOG_NAME.to_string(), + schema_name: DEFAULT_SCHEMA_NAME.to_string(), + table_name: TABLE_NAME.to_string(), + table_id: TABLE_ID, + } +} + pub struct TestEngineComponents { pub table_engine: MitoEngine>, pub storage_engine: EngineImpl, diff --git a/src/mito/src/table/test_util/mock_engine.rs b/src/mito/src/table/test_util/mock_engine.rs index 4dadedd7b2..3781871814 100644 --- a/src/mito/src/table/test_util/mock_engine.rs +++ b/src/mito/src/table/test_util/mock_engine.rs @@ -209,6 +209,10 @@ impl Region for MockRegion { async fn compact(&self, _ctx: &CompactContext) -> std::result::Result<(), Self::Error> { unimplemented!() } + + async fn truncate(&self) -> Result<()> { + unimplemented!() + } } impl MockRegionInner { diff --git a/src/storage/src/engine.rs b/src/storage/src/engine.rs index 19fe282bdd..d6079ecc8e 100644 --- a/src/storage/src/engine.rs +++ b/src/storage/src/engine.rs @@ -539,7 +539,10 @@ mod tests { use log_store::raft_engine::log_store::RaftEngineLogStore; use log_store::test_util::log_store_util; use object_store::services::Fs; - use store_api::storage::{FlushContext, Region, WriteContext, WriteRequest}; + use store_api::storage::{ + ChunkReader, FlushContext, ReadContext, Region, ScanRequest, Snapshot, WriteContext, + WriteRequest, + }; use super::*; use crate::compaction::noop::NoopCompactionScheduler; @@ -698,4 +701,50 @@ mod tests { tokio::time::sleep(Duration::from_millis(60)).await; assert_eq!(0, parquet_file_num(&dir_path)); } + + #[tokio::test] + async fn test_truncate_region() { + common_telemetry::init_default_ut_logging(); + let dir = create_temp_dir("test_truncate_region"); + let log_file_dir = create_temp_dir("test_engine_wal"); + + let region_name = "test_region"; + let region_id = 123456; + let config = EngineConfig::default(); + + let (engine, region) = + create_engine_and_region(&dir, &log_file_dir, region_name, region_id, config).await; + + assert_eq!(region_name, region.name()); + + let mut wb = region.write_request(); + let k1 = Arc::new(Int32Vector::from_slice([1, 2, 3])) as VectorRef; + let v1 = Arc::new(Float32Vector::from_slice([0.1, 0.2, 0.3])) as VectorRef; + let tsv = Arc::new(TimestampMillisecondVector::from_slice([0, 0, 0])) as VectorRef; + + let put_data = HashMap::from([ + ("k1".to_string(), k1), + ("v1".to_string(), v1), + ("ts".to_string(), tsv), + ]); + wb.put(put_data).unwrap(); + + // Insert data. + region.write(&WriteContext::default(), wb).await.unwrap(); + let ctx = EngineContext::default(); + + // Truncate region. + region.truncate().await.unwrap(); + assert!(engine.get_region(&ctx, region.name()).unwrap().is_some()); + + // Scan to verify the region is empty. + let read_ctx = ReadContext::default(); + let snapshot = region.snapshot(&read_ctx).unwrap(); + let resp = snapshot + .scan(&read_ctx, ScanRequest::default()) + .await + .unwrap(); + let mut reader = resp.reader; + assert!(reader.next_chunk().await.unwrap().is_none()); + } } diff --git a/src/storage/src/manifest/action.rs b/src/storage/src/manifest/action.rs index ba8bdc12d5..3158ee94ae 100644 --- a/src/storage/src/manifest/action.rs +++ b/src/storage/src/manifest/action.rs @@ -79,6 +79,12 @@ pub struct RegionEdit { pub compaction_time_window: Option, } +#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)] +pub struct RegionTruncate { + pub region_id: RegionId, + pub committed_sequence: SequenceNumber, +} + /// The region version checkpoint #[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)] pub struct RegionVersion { @@ -190,6 +196,7 @@ pub enum RegionMetaAction { Change(RegionChange), Remove(RegionRemove), Edit(RegionEdit), + Truncate(RegionTruncate), } #[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)] diff --git a/src/storage/src/manifest/region.rs b/src/storage/src/manifest/region.rs index 9f3f6b8544..cacee02201 100644 --- a/src/storage/src/manifest/region.rs +++ b/src/storage/src/manifest/region.rs @@ -584,4 +584,95 @@ mod tests { manifest.stop().await.unwrap(); } + + #[tokio::test] + async fn test_region_manifest_truncate() { + common_telemetry::init_default_ut_logging(); + + let manifest = new_fs_manifest(false, None).await; + let region_meta = Arc::new(build_region_meta()); + let committed_sequence = 99; + + let file = FileId::random(); + let file_ids = vec![FileId::random(), FileId::random()]; + + // Save some actions. + let actions: Vec = vec![ + RegionMetaActionList::with_action(RegionMetaAction::Change(RegionChange { + metadata: region_meta.as_ref().into(), + committed_sequence: 1, + })), + RegionMetaActionList::new(vec![ + RegionMetaAction::Edit(build_region_edit(2, &[file], &[])), + RegionMetaAction::Edit(build_region_edit(3, &file_ids, &[file])), + ]), + RegionMetaActionList::with_action(RegionMetaAction::Truncate(RegionTruncate { + region_id: 0.into(), + committed_sequence, + })), + RegionMetaActionList::with_action(RegionMetaAction::Change(RegionChange { + metadata: region_meta.as_ref().into(), + committed_sequence: 1, + })), + ]; + + for action in actions { + manifest.update(action).await.unwrap(); + } + + // Scan manifest. + let mut iter = manifest.scan(0, MAX_VERSION).await.unwrap(); + + let (v, action_list) = iter.next_action().await.unwrap().unwrap(); + info!("action_list = {:?}", action_list.actions); + assert_eq!(0, v); + assert_eq!(2, action_list.actions.len()); + let protocol = &action_list.actions[0]; + assert!(matches!( + protocol, + RegionMetaAction::Protocol(ProtocolAction { .. }) + )); + + let change = &action_list.actions[1]; + assert!(matches!( + change, + RegionMetaAction::Change(RegionChange { + committed_sequence: 1, + .. + }) + )); + + let (v, action_list) = iter.next_action().await.unwrap().unwrap(); + assert_eq!(1, v); + assert_eq!(2, action_list.actions.len()); + assert!(matches!(&action_list.actions[0], RegionMetaAction::Edit(_))); + assert!(matches!(&action_list.actions[1], RegionMetaAction::Edit(_))); + + let (v, action_list) = iter.next_action().await.unwrap().unwrap(); + assert_eq!(2, v); + assert_eq!(1, action_list.actions.len()); + let truncate = &action_list.actions[0]; + assert!(matches!( + truncate, + RegionMetaAction::Truncate(RegionTruncate { + committed_sequence: 99, + .. + }) + )); + + let (v, action_list) = iter.next_action().await.unwrap().unwrap(); + assert_eq!(3, v); + assert_eq!(1, action_list.actions.len()); + let change = &action_list.actions[0]; + assert!(matches!( + change, + RegionMetaAction::Change(RegionChange { + committed_sequence: 1, + .. + }) + )); + + // Reach end + assert!(iter.next_action().await.unwrap().is_none()); + } } diff --git a/src/storage/src/manifest/test_utils.rs b/src/storage/src/manifest/test_utils.rs index 8d8b7ef693..35b6d2fb0b 100644 --- a/src/storage/src/manifest/test_utils.rs +++ b/src/storage/src/manifest/test_utils.rs @@ -74,3 +74,10 @@ pub fn build_region_edit( compaction_time_window: None, } } + +pub fn build_region_truncate(committed_sequence: u64) -> RegionTruncate { + RegionTruncate { + region_id: 0.into(), + committed_sequence, + } +} diff --git a/src/storage/src/region.rs b/src/storage/src/region.rs index a35ec67129..3545e17139 100644 --- a/src/storage/src/region.rs +++ b/src/storage/src/region.rs @@ -47,16 +47,16 @@ use crate::manifest::action::{ RawRegionMetadata, RegionChange, RegionCheckpoint, RegionMetaAction, RegionMetaActionList, }; use crate::manifest::region::RegionManifest; -use crate::memtable::MemtableBuilderRef; +use crate::memtable::{MemtableBuilderRef, MemtableVersion}; use crate::metadata::{RegionMetaImpl, RegionMetadata, RegionMetadataRef}; pub(crate) use crate::region::writer::schedule_compaction; -use crate::region::writer::DropContext; pub use crate::region::writer::{ AlterContext, RegionWriter, RegionWriterRef, WriterCompactRequest, WriterContext, }; +use crate::region::writer::{DropContext, TruncateContext}; use crate::schema::compat::CompatWrite; use crate::snapshot::SnapshotImpl; -use crate::sst::AccessLayerRef; +use crate::sst::{AccessLayerRef, LevelMetas}; use crate::version::{ Version, VersionControl, VersionControlRef, VersionEdit, INIT_COMMITTED_SEQUENCE, }; @@ -154,6 +154,10 @@ impl Region for RegionImpl { async fn compact(&self, ctx: &CompactContext) -> std::result::Result<(), Self::Error> { self.inner.compact(ctx).await } + + async fn truncate(&self) -> Result<()> { + self.inner.truncate().await + } } /// Storage related config for region. @@ -502,6 +506,27 @@ impl RegionImpl { .await?; return Ok((None, recovered_metadata)); } + (RegionMetaAction::Truncate(t), Some(mut v)) => { + let files = v.ssts().mark_all_files_deleted(); + logging::info!( + "Try to remove all SSTs on truncate, region: {}, files: {:?}", + t.region_id, + files + ); + let region_metadata = v.metadata().clone(); + let memtables = Arc::new(MemtableVersion::new( + memtable_builder.build(region_metadata.schema().clone()), + )); + let ssts = + Arc::new(LevelMetas::new(sst_layer.clone(), file_purger.clone())); + v.reset( + v.manifest_version() + 1, + memtables, + ssts, + t.committed_sequence, + ); + version = Some(v); + } (action, None) => { actions.push((manifest_version, action)); version = None; @@ -769,4 +794,22 @@ impl RegionInner { }) .await } + + async fn truncate(&self) -> Result<()> { + logging::info!( + "Truncate region {}, name: {}", + self.shared.id, + self.shared.name + ); + + let ctx = TruncateContext { + shared: &self.shared, + wal: &self.wal, + manifest: &self.manifest, + sst_layer: &self.sst_layer, + }; + + self.writer.truncate(&ctx).await?; + Ok(()) + } } diff --git a/src/storage/src/region/tests.rs b/src/storage/src/region/tests.rs index 9eeae2f729..36025e0097 100644 --- a/src/storage/src/region/tests.rs +++ b/src/storage/src/region/tests.rs @@ -63,6 +63,7 @@ mod compact; mod drop; mod flush; mod projection; +mod truncate; /// Create metadata of a region with schema: (timestamp, v0). pub fn new_metadata(region_name: &str) -> RegionMetadata { diff --git a/src/storage/src/region/tests/truncate.rs b/src/storage/src/region/tests/truncate.rs new file mode 100644 index 0000000000..100e990c4d --- /dev/null +++ b/src/storage/src/region/tests/truncate.rs @@ -0,0 +1,242 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Region truncate tests. + +use std::sync::Arc; + +use common_test_util::temp_dir::create_temp_dir; +use log_store::raft_engine::log_store::RaftEngineLogStore; +use store_api::manifest::{Manifest, MetaAction}; +use store_api::storage::{FlushContext, OpenOptions, Region}; + +use crate::config::EngineConfig; +use crate::engine; +use crate::flush::FlushStrategyRef; +use crate::manifest::action::{RegionMetaAction, RegionMetaActionList, RegionTruncate}; +use crate::region::tests::{self, FileTesterBase}; +use crate::region::RegionImpl; +use crate::test_util::config_util; +use crate::test_util::flush_switch::{has_parquet_file, FlushSwitch}; + +const REGION_NAME: &str = "region-truncate-0"; + +/// Create a new region for truncate tests. +async fn create_region_for_truncate( + store_dir: &str, + flush_strategy: FlushStrategyRef, +) -> RegionImpl { + let metadata = tests::new_metadata(REGION_NAME); + + let mut store_config = + config_util::new_store_config(REGION_NAME, store_dir, EngineConfig::default()).await; + store_config.flush_strategy = flush_strategy; + + RegionImpl::create(metadata, store_config).await.unwrap() +} + +/// Tester for truncate tests. +struct TruncateTester { + store_dir: String, + base: Option, +} + +impl TruncateTester { + async fn new(store_dir: &str, flush_strategy: FlushStrategyRef) -> TruncateTester { + let region = create_region_for_truncate(store_dir, flush_strategy).await; + TruncateTester { + store_dir: store_dir.to_string(), + base: Some(FileTesterBase::with_region(region)), + } + } + + #[inline] + fn base(&self) -> &FileTesterBase { + self.base.as_ref().unwrap() + } + + async fn flush(&self) { + let ctx = FlushContext::default(); + self.base().region.flush(&ctx).await.unwrap(); + } + + async fn truncate(&self) { + self.base().region.truncate().await.unwrap(); + } + + async fn reopen(&mut self) { + // Close the old region. + if let Some(base) = self.base.as_ref() { + base.close().await; + } + self.base = None; + // Reopen the region. + let store_config = config_util::new_store_config( + REGION_NAME, + &self.store_dir, + EngineConfig { + max_files_in_l0: usize::MAX, + ..Default::default() + }, + ) + .await; + + let opts = OpenOptions::default(); + let region = RegionImpl::open(REGION_NAME.to_string(), store_config, &opts) + .await + .unwrap() + .unwrap(); + + self.base = Some(FileTesterBase::with_region(region)); + } +} + +#[tokio::test] +async fn test_truncate_basic() { + let dir = create_temp_dir("truncate-basic"); + common_telemetry::init_default_ut_logging(); + let store_dir = dir.path().to_str().unwrap(); + + let flush_switch = Arc::new(FlushSwitch::default()); + let tester = TruncateTester::new(store_dir, flush_switch.clone()).await; + + let data = [ + (1000, Some("1000".to_string())), + (1001, Some("1001".to_string())), + (1002, Some("1002".to_string())), + (1003, Some("1003".to_string())), + ]; + + // Data in Memtable + tester.base().put(&data).await; + let res = tester.base().full_scan().await; + assert_eq!(4, res.len()); + + // Truncate region. + tester.truncate().await; + + let res = tester.base().full_scan().await; + assert_eq!(0, res.len()); +} + +#[tokio::test] +async fn test_put_data_after_truncate() { + let dir = create_temp_dir("put_data_after_truncate"); + common_telemetry::init_default_ut_logging(); + let store_dir = dir.path().to_str().unwrap(); + + let sst_dir = format!("{}/{}", store_dir, engine::region_sst_dir("", REGION_NAME)); + let flush_switch = Arc::new(FlushSwitch::default()); + let tester = TruncateTester::new(store_dir, flush_switch.clone()).await; + + let data = [ + (1000, Some("1000".to_string())), + (1001, Some("1001".to_string())), + (1002, None), + (1003, Some("1003".to_string())), + ]; + + tester.base().put(&data).await; + + // Manually trigger flush. + tester.flush().await; + assert!(has_parquet_file(&sst_dir)); + + let data = [ + (1002, Some("1002".to_string())), + (1004, Some("1004".to_string())), + (1005, Some("1005".to_string())), + ]; + tester.base().put(&data).await; + + // Truncate region. + tester.truncate().await; + let res = tester.base().full_scan().await; + assert_eq!(0, res.len()); + + let new_data = [ + (1010, Some("0".to_string())), + (1011, Some("1".to_string())), + (1012, Some("2".to_string())), + (1013, Some("3".to_string())), + ]; + tester.base().put(&new_data).await; + + let res = tester.base().full_scan().await; + assert_eq!(new_data, res.as_slice()); +} + +#[tokio::test] +async fn test_truncate_reopen() { + let dir = create_temp_dir("put_data_after_truncate"); + common_telemetry::init_default_ut_logging(); + let store_dir = dir.path().to_str().unwrap(); + + let flush_switch = Arc::new(FlushSwitch::default()); + let mut tester = TruncateTester::new(store_dir, flush_switch.clone()).await; + + let data = [ + (1000, Some("1000".to_string())), + (1001, Some("1001".to_string())), + (1002, None), + (1003, Some("1003".to_string())), + ]; + + tester.base().put(&data).await; + + // Manually trigger flush. + tester.flush().await; + + let data = [ + (1002, Some("1002".to_string())), + (1004, Some("1004".to_string())), + (1005, Some("1005".to_string())), + ]; + tester.base().put(&data).await; + + let manifest = &tester.base().region.inner.manifest; + let manifest_version = tester + .base() + .region + .version_control() + .current_manifest_version(); + + let committed_sequence = tester.base().committed_sequence(); + let mut action_list = + RegionMetaActionList::with_action(RegionMetaAction::Truncate(RegionTruncate { + region_id: 0.into(), + committed_sequence, + })); + + // Persist the meta action. + let prev_version = manifest_version; + action_list.set_prev_version(prev_version); + assert!(manifest.update(action_list).await.is_ok()); + + // Reopen and put data. + tester.reopen().await; + let res = tester.base().full_scan().await; + assert_eq!(0, res.len()); + + let new_data = [ + (0, Some("0".to_string())), + (1, Some("1".to_string())), + (2, Some("2".to_string())), + (3, Some("3".to_string())), + ]; + + tester.base().put(&new_data).await; + let res = tester.base().full_scan().await; + assert_eq!(new_data, res.as_slice()); +} diff --git a/src/storage/src/region/writer.rs b/src/storage/src/region/writer.rs index 78f425fc50..f78cf46e70 100644 --- a/src/storage/src/region/writer.rs +++ b/src/storage/src/region/writer.rs @@ -35,9 +35,9 @@ use crate::flush::{ }; use crate::manifest::action::{ RawRegionMetadata, RegionChange, RegionEdit, RegionMetaAction, RegionMetaActionList, - RegionRemove, + RegionRemove, RegionTruncate, }; -use crate::memtable::{Inserter, MemtableBuilderRef, MemtableId, MemtableRef}; +use crate::memtable::{Inserter, MemtableBuilderRef, MemtableId, MemtableRef, MemtableVersion}; use crate::metadata::RegionMetadataRef; use crate::metrics::{FLUSH_REASON, FLUSH_REQUESTS_TOTAL, PREPROCESS_ELAPSED}; use crate::proto::wal::WalHeader; @@ -45,7 +45,7 @@ use crate::region::{ CompactContext, RecoveredMetadata, RecoveredMetadataMap, RegionManifest, SharedDataRef, }; use crate::schema::compat::CompatWrite; -use crate::sst::AccessLayerRef; +use crate::sst::{AccessLayerRef, LevelMetas}; use crate::version::{VersionControl, VersionControlRef, VersionEdit}; use crate::wal::Wal; use crate::write_batch::WriteBatch; @@ -397,6 +397,55 @@ where Ok(()) } + + pub async fn truncate(&self, ctx: &TruncateContext<'_, S>) -> Result<()> { + // Acquires the write lock. + let mut inner = self.inner.lock().await; + ensure!(!inner.is_closed(), error::ClosedRegionSnafu); + + if let Some(handle) = inner.flush_handle.take() { + handle.wait().await?; + } + + let version_control = ctx.version_control(); + let _lock = self.version_mutex.lock().await; + let committed_sequence = version_control.committed_sequence(); + + // Add `RegionMetaAction::Truncate` to recover from manifest in case of failure. + let mut action_list = + RegionMetaActionList::with_action(RegionMetaAction::Truncate(RegionTruncate { + region_id: ctx.shared.id, + committed_sequence, + })); + + // Persist the meta action. + let current_version = version_control.current(); + let manifest_version = version_control.current_manifest_version(); + let prev_version = manifest_version; + action_list.set_prev_version(prev_version); + ctx.manifest.update(action_list).await?; + + // Mark all data obsolete + ctx.wal.obsolete(committed_sequence).await?; + + // Mark all SSTs deleted + let files = current_version.ssts().mark_all_files_deleted(); + logging::info!( + "Try to remove all SSTs, region: {}, files: {:?}", + ctx.shared.id(), + files + ); + + // Reset version + let memtables = Arc::new(MemtableVersion::new(inner.alloc_memtable(version_control))); + let ssts = Arc::new(LevelMetas::new( + ctx.sst_layer.clone(), + current_version.ssts().file_purger(), + )); + version_control.reset_version(manifest_version + 1, memtables, ssts); + + Ok(()) + } } // Methods for tests. @@ -468,6 +517,20 @@ impl<'a, S: LogStore> DropContext<'a, S> { } } +pub struct TruncateContext<'a, S: LogStore> { + pub shared: &'a SharedDataRef, + pub wal: &'a Wal, + pub manifest: &'a RegionManifest, + pub sst_layer: &'a AccessLayerRef, +} + +impl<'a, S: LogStore> TruncateContext<'a, S> { + #[inline] + fn version_control(&self) -> &VersionControlRef { + &self.shared.version_control + } +} + #[derive(Debug)] struct WriterInner { memtable_builder: MemtableBuilderRef, diff --git a/src/storage/src/sst.rs b/src/storage/src/sst.rs index 0412ae560d..2d59ed1c94 100644 --- a/src/storage/src/sst.rs +++ b/src/storage/src/sst.rs @@ -150,6 +150,10 @@ impl LevelMetas { pub fn levels(&self) -> &[LevelMeta] { &self.levels } + + pub fn file_purger(&self) -> FilePurgerRef { + self.file_purger.clone() + } } /// Metadata of files in same SST level. diff --git a/src/storage/src/version.rs b/src/storage/src/version.rs index d5d3d4bf3e..f614f320a9 100644 --- a/src/storage/src/version.rs +++ b/src/storage/src/version.rs @@ -125,6 +125,17 @@ impl VersionControl { version_to_update.apply_metadata(metadata, manifest_version); version_to_update.commit(); } + + pub fn reset_version( + &self, + manifest_version: ManifestVersion, + memtables: MemtableVersionRef, + ssts: LevelMetasRef, + ) { + let mut version_to_update = self.version.lock(); + version_to_update.reset(manifest_version, memtables, ssts, 0); + version_to_update.commit(); + } } #[derive(Debug)] @@ -307,6 +318,19 @@ impl Version { pub fn manifest_version(&self) -> ManifestVersion { self.manifest_version } + + pub fn reset( + &mut self, + manifest_version: ManifestVersion, + memtables: MemtableVersionRef, + ssts: LevelMetasRef, + flushed_sequence: SequenceNumber, + ) { + self.memtables = memtables; + self.ssts = ssts; + self.manifest_version = manifest_version; + self.flushed_sequence = flushed_sequence; + } } #[cfg(test)] diff --git a/src/store-api/src/storage/region.rs b/src/store-api/src/storage/region.rs index ac6037e066..66f09be0c8 100644 --- a/src/store-api/src/storage/region.rs +++ b/src/store-api/src/storage/region.rs @@ -88,6 +88,8 @@ pub trait Region: Send + Sync + Clone + std::fmt::Debug + 'static { async fn flush(&self, ctx: &FlushContext) -> Result<(), Self::Error>; async fn compact(&self, ctx: &CompactContext) -> Result<(), Self::Error>; + + async fn truncate(&self) -> Result<(), Self::Error>; } #[derive(Default, Debug)] diff --git a/src/table/src/engine.rs b/src/table/src/engine.rs index 98f1c3a533..2121aa32b7 100644 --- a/src/table/src/engine.rs +++ b/src/table/src/engine.rs @@ -23,6 +23,7 @@ use crate::error::{self, Result}; use crate::metadata::TableId; use crate::requests::{ AlterTableRequest, CloseTableRequest, CreateTableRequest, DropTableRequest, OpenTableRequest, + TruncateTableRequest, }; use crate::TableRef; pub mod manager; @@ -132,6 +133,12 @@ pub trait TableEngine: Send + Sync { /// Close the engine. async fn close(&self) -> Result<()>; + + async fn truncate_table( + &self, + _ctx: &EngineContext, + _request: TruncateTableRequest, + ) -> Result; } pub type TableEngineRef = Arc; diff --git a/src/table/src/table.rs b/src/table/src/table.rs index 34d7265a9b..70fdc90ffa 100644 --- a/src/table/src/table.rs +++ b/src/table/src/table.rs @@ -130,6 +130,13 @@ pub trait Table: Send + Sync { } .fail()? } + + async fn truncate(&self) -> Result<()> { + UnsupportedSnafu { + operation: "TRUNCATE", + } + .fail()? + } } pub type TableRef = Arc; diff --git a/src/table/src/test_util/mock_engine.rs b/src/table/src/test_util/mock_engine.rs index 4850849fd9..d6be35d8ec 100644 --- a/src/table/src/test_util/mock_engine.rs +++ b/src/table/src/test_util/mock_engine.rs @@ -21,7 +21,9 @@ use tokio::sync::Mutex; use crate::engine::{EngineContext, TableEngine, TableEngineProcedure}; use crate::metadata::TableId; -use crate::requests::{AlterTableRequest, CreateTableRequest, DropTableRequest, OpenTableRequest}; +use crate::requests::{ + AlterTableRequest, CreateTableRequest, DropTableRequest, OpenTableRequest, TruncateTableRequest, +}; use crate::test_util::EmptyTable; use crate::{Result, TableRef}; @@ -103,6 +105,14 @@ impl TableEngine for MockTableEngine { async fn close(&self) -> Result<()> { Ok(()) } + + async fn truncate_table( + &self, + _ctx: &EngineContext, + _request: TruncateTableRequest, + ) -> Result { + Ok(true) + } } impl TableEngineProcedure for MockTableEngine {