feat: truncate region (#2097)

* feat: impl truncate region

* test: test truncate region

* chore: typo

* refactor: table truncate

* chore: remove useless changes

* chore: reset version

* fix: wait for flush task to complete

* fix: clippy

* chore: remove useless changes

* CR

Co-authored-by: Yingwen <realevenyag@gmail.com>

* Update src/storage/src/engine.rs

Co-authored-by: Yingwen <realevenyag@gmail.com>

* Update src/storage/src/engine.rs

Co-authored-by: Yingwen <realevenyag@gmail.com>

* Update src/storage/src/region.rs

Co-authored-by: Yingwen <realevenyag@gmail.com>

* Update src/storage/src/region/tests/truncate.rs

Co-authored-by: Yingwen <realevenyag@gmail.com>

* Update src/storage/src/region/tests/truncate.rs

Co-authored-by: Yingwen <realevenyag@gmail.com>

* Update src/storage/src/region/writer.rs

Co-authored-by: Yingwen <realevenyag@gmail.com>

* CR

* Update src/storage/src/engine.rs

Co-authored-by: Yingwen <realevenyag@gmail.com>

* Update src/storage/src/manifest/region.rs

Co-authored-by: Yingwen <realevenyag@gmail.com>

---------

Co-authored-by: Yingwen <realevenyag@gmail.com>
This commit is contained in:
Vanish
2023-08-04 20:26:25 +08:00
committed by GitHub
parent cb4dd89754
commit cf7e8c9142
21 changed files with 695 additions and 11 deletions

View File

@@ -23,6 +23,7 @@ use table::engine::{CloseTableResult, EngineContext, TableEngine};
use table::metadata::TableId;
use table::requests::{
AlterTableRequest, CloseTableRequest, CreateTableRequest, DropTableRequest, OpenTableRequest,
TruncateTableRequest,
};
use table::test_util::MemTable;
use table::TableRef;
@@ -116,4 +117,12 @@ impl TableEngine for MockTableEngine {
async fn close(&self) -> table::Result<()> {
Ok(())
}
async fn truncate_table(
&self,
_ctx: &EngineContext,
_request: TruncateTableRequest,
) -> table::Result<bool> {
Ok(true)
}
}

View File

@@ -26,7 +26,9 @@ use snafu::ResultExt;
use table::engine::{table_dir, EngineContext, TableEngine, TableEngineProcedure, TableReference};
use table::error::TableOperationSnafu;
use table::metadata::{TableId, TableInfo, TableInfoBuilder, TableMetaBuilder, TableType};
use table::requests::{AlterTableRequest, CreateTableRequest, DropTableRequest, OpenTableRequest};
use table::requests::{
AlterTableRequest, CreateTableRequest, DropTableRequest, OpenTableRequest, TruncateTableRequest,
};
use table::{error as table_error, Result as TableResult, Table, TableRef};
use tokio::sync::Mutex;
@@ -111,6 +113,14 @@ impl TableEngine for ImmutableFileTableEngine {
async fn close(&self) -> TableResult<()> {
self.inner.close().await
}
async fn truncate_table(
&self,
_ctx: &EngineContext,
_request: TruncateTableRequest,
) -> TableResult<bool> {
Ok(true)
}
}
#[async_trait]

View File

@@ -43,6 +43,7 @@ use table::engine::{
use table::metadata::{TableId, TableInfo, TableVersion};
use table::requests::{
AlterTableRequest, CloseTableRequest, CreateTableRequest, DropTableRequest, OpenTableRequest,
TruncateTableRequest,
};
use table::{error as table_error, Result as TableResult, Table, TableRef};
@@ -184,6 +185,14 @@ impl<S: StorageEngine> TableEngine for MitoEngine<S> {
async fn close(&self) -> TableResult<()> {
self.inner.close().await
}
async fn truncate_table(
&self,
_ctx: &EngineContext,
request: TruncateTableRequest,
) -> TableResult<bool> {
self.inner.truncate_table(request).await
}
}
impl<S: StorageEngine> TableEngineProcedure for MitoEngine<S> {
@@ -704,6 +713,22 @@ impl<S: StorageEngine> MitoEngineInner<S> {
// Partial closed
Ok(CloseTableResult::PartialClosed(removed_regions))
}
async fn truncate_table(&self, request: TruncateTableRequest) -> TableResult<bool> {
let _lock = self.table_mutex.lock(request.table_id).await;
let table_id = request.table_id;
if let Some(table) = self.get_mito_table(table_id) {
table
.truncate()
.await
.map_err(BoxedError::new)
.context(table_error::TableOperationSnafu)?;
Ok(true)
} else {
Ok(false)
}
}
}
impl<S: StorageEngine> MitoEngineInner<S> {

View File

@@ -38,7 +38,7 @@ use table::Table;
use super::*;
use crate::table::test_util::{
self, new_insert_request, setup_table, TestEngineComponents, TABLE_NAME,
self, new_insert_request, new_truncate_request, setup_table, TestEngineComponents, TABLE_NAME,
};
pub fn has_parquet_file(sst_dir: &str) -> bool {
@@ -933,3 +933,63 @@ async fn test_flush_table_with_region_id() {
assert!(has_parquet_file(&region_dir));
}
#[tokio::test]
async fn test_truncate_table() {
common_telemetry::init_default_ut_logging();
let ctx = EngineContext::default();
let TestEngineComponents {
table_engine,
table_ref: table,
dir: _dir,
..
} = test_util::setup_test_engine_and_table().await;
let hosts: VectorRef = Arc::new(StringVector::from(vec!["host1", "host2", "host3", "host4"]));
let cpus: VectorRef = Arc::new(Float64Vector::from_vec(vec![1.0, 2.0, 3.0, 4.0]));
let memories: VectorRef = Arc::new(Float64Vector::from_vec(vec![1.0, 2.0, 3.0, 4.0]));
let tss: VectorRef = Arc::new(TimestampMillisecondVector::from_vec(vec![1, 2, 2, 1]));
let columns_values = HashMap::from([
("host".to_string(), hosts.clone()),
("cpu".to_string(), cpus.clone()),
("memory".to_string(), memories.clone()),
("ts".to_string(), tss.clone()),
]);
// Insert data.
let insert_req = new_insert_request("demo".to_string(), columns_values.clone());
assert_eq!(4, table.insert(insert_req).await.unwrap());
// truncate table.
let truncate_req = new_truncate_request();
let res = table_engine
.truncate_table(&ctx, truncate_req)
.await
.unwrap();
assert!(res);
// Verify table is empty.
let stream = table.scan_to_stream(ScanRequest::default()).await.unwrap();
let batches = util::collect(stream).await.unwrap();
assert!(batches.is_empty());
// Validate the data insertion again.
let insert_req = new_insert_request("demo".to_string(), columns_values);
assert_eq!(4, table.insert(insert_req).await.unwrap());
let stream = table.scan_to_stream(ScanRequest::default()).await.unwrap();
let batches = util::collect_batches(stream).await.unwrap();
assert_eq!(
batches.pretty_print().unwrap(),
"\
+-------+-----+--------+-------------------------+
| host | cpu | memory | ts |
+-------+-----+--------+-------------------------+
| host1 | 1.0 | 1.0 | 1970-01-01T00:00:00.001 |
| host2 | 2.0 | 2.0 | 1970-01-01T00:00:00.002 |
| host3 | 3.0 | 3.0 | 1970-01-01T00:00:00.002 |
| host4 | 4.0 | 4.0 | 1970-01-01T00:00:00.001 |
+-------+-----+--------+-------------------------+"
);
}

View File

@@ -360,6 +360,15 @@ impl<R: Region> Table for MitoTable<R> {
Ok(())
}
async fn truncate(&self) -> TableResult<()> {
let regions = self.regions.load();
let _ = futures::future::try_join_all(regions.values().map(|region| region.truncate()))
.await
.map_err(BoxedError::new)
.context(TableOperationSnafu)?;
Ok(())
}
fn region_stats(&self) -> TableResult<Vec<RegionStat>> {
let regions = self.regions.load();

View File

@@ -30,7 +30,8 @@ use storage::EngineImpl;
use table::engine::{EngineContext, TableEngine};
use table::metadata::{TableId, TableInfo, TableInfoBuilder, TableMetaBuilder, TableType};
use table::requests::{
AlterKind, AlterTableRequest, CreateTableRequest, DropTableRequest, InsertRequest, TableOptions,
AlterKind, AlterTableRequest, CreateTableRequest, DropTableRequest, InsertRequest,
TableOptions, TruncateTableRequest,
};
use table::{Table, TableRef};
@@ -142,6 +143,15 @@ pub fn new_drop_request() -> DropTableRequest {
}
}
pub fn new_truncate_request() -> TruncateTableRequest {
TruncateTableRequest {
catalog_name: DEFAULT_CATALOG_NAME.to_string(),
schema_name: DEFAULT_SCHEMA_NAME.to_string(),
table_name: TABLE_NAME.to_string(),
table_id: TABLE_ID,
}
}
pub struct TestEngineComponents {
pub table_engine: MitoEngine<EngineImpl<NoopLogStore>>,
pub storage_engine: EngineImpl<NoopLogStore>,

View File

@@ -209,6 +209,10 @@ impl Region for MockRegion {
async fn compact(&self, _ctx: &CompactContext) -> std::result::Result<(), Self::Error> {
unimplemented!()
}
async fn truncate(&self) -> Result<()> {
unimplemented!()
}
}
impl MockRegionInner {

View File

@@ -539,7 +539,10 @@ mod tests {
use log_store::raft_engine::log_store::RaftEngineLogStore;
use log_store::test_util::log_store_util;
use object_store::services::Fs;
use store_api::storage::{FlushContext, Region, WriteContext, WriteRequest};
use store_api::storage::{
ChunkReader, FlushContext, ReadContext, Region, ScanRequest, Snapshot, WriteContext,
WriteRequest,
};
use super::*;
use crate::compaction::noop::NoopCompactionScheduler;
@@ -698,4 +701,50 @@ mod tests {
tokio::time::sleep(Duration::from_millis(60)).await;
assert_eq!(0, parquet_file_num(&dir_path));
}
#[tokio::test]
async fn test_truncate_region() {
common_telemetry::init_default_ut_logging();
let dir = create_temp_dir("test_truncate_region");
let log_file_dir = create_temp_dir("test_engine_wal");
let region_name = "test_region";
let region_id = 123456;
let config = EngineConfig::default();
let (engine, region) =
create_engine_and_region(&dir, &log_file_dir, region_name, region_id, config).await;
assert_eq!(region_name, region.name());
let mut wb = region.write_request();
let k1 = Arc::new(Int32Vector::from_slice([1, 2, 3])) as VectorRef;
let v1 = Arc::new(Float32Vector::from_slice([0.1, 0.2, 0.3])) as VectorRef;
let tsv = Arc::new(TimestampMillisecondVector::from_slice([0, 0, 0])) as VectorRef;
let put_data = HashMap::from([
("k1".to_string(), k1),
("v1".to_string(), v1),
("ts".to_string(), tsv),
]);
wb.put(put_data).unwrap();
// Insert data.
region.write(&WriteContext::default(), wb).await.unwrap();
let ctx = EngineContext::default();
// Truncate region.
region.truncate().await.unwrap();
assert!(engine.get_region(&ctx, region.name()).unwrap().is_some());
// Scan to verify the region is empty.
let read_ctx = ReadContext::default();
let snapshot = region.snapshot(&read_ctx).unwrap();
let resp = snapshot
.scan(&read_ctx, ScanRequest::default())
.await
.unwrap();
let mut reader = resp.reader;
assert!(reader.next_chunk().await.unwrap().is_none());
}
}

View File

@@ -79,6 +79,12 @@ pub struct RegionEdit {
pub compaction_time_window: Option<i64>,
}
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)]
pub struct RegionTruncate {
pub region_id: RegionId,
pub committed_sequence: SequenceNumber,
}
/// The region version checkpoint
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)]
pub struct RegionVersion {
@@ -190,6 +196,7 @@ pub enum RegionMetaAction {
Change(RegionChange),
Remove(RegionRemove),
Edit(RegionEdit),
Truncate(RegionTruncate),
}
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)]

View File

@@ -584,4 +584,95 @@ mod tests {
manifest.stop().await.unwrap();
}
#[tokio::test]
async fn test_region_manifest_truncate() {
common_telemetry::init_default_ut_logging();
let manifest = new_fs_manifest(false, None).await;
let region_meta = Arc::new(build_region_meta());
let committed_sequence = 99;
let file = FileId::random();
let file_ids = vec![FileId::random(), FileId::random()];
// Save some actions.
let actions: Vec<RegionMetaActionList> = vec![
RegionMetaActionList::with_action(RegionMetaAction::Change(RegionChange {
metadata: region_meta.as_ref().into(),
committed_sequence: 1,
})),
RegionMetaActionList::new(vec![
RegionMetaAction::Edit(build_region_edit(2, &[file], &[])),
RegionMetaAction::Edit(build_region_edit(3, &file_ids, &[file])),
]),
RegionMetaActionList::with_action(RegionMetaAction::Truncate(RegionTruncate {
region_id: 0.into(),
committed_sequence,
})),
RegionMetaActionList::with_action(RegionMetaAction::Change(RegionChange {
metadata: region_meta.as_ref().into(),
committed_sequence: 1,
})),
];
for action in actions {
manifest.update(action).await.unwrap();
}
// Scan manifest.
let mut iter = manifest.scan(0, MAX_VERSION).await.unwrap();
let (v, action_list) = iter.next_action().await.unwrap().unwrap();
info!("action_list = {:?}", action_list.actions);
assert_eq!(0, v);
assert_eq!(2, action_list.actions.len());
let protocol = &action_list.actions[0];
assert!(matches!(
protocol,
RegionMetaAction::Protocol(ProtocolAction { .. })
));
let change = &action_list.actions[1];
assert!(matches!(
change,
RegionMetaAction::Change(RegionChange {
committed_sequence: 1,
..
})
));
let (v, action_list) = iter.next_action().await.unwrap().unwrap();
assert_eq!(1, v);
assert_eq!(2, action_list.actions.len());
assert!(matches!(&action_list.actions[0], RegionMetaAction::Edit(_)));
assert!(matches!(&action_list.actions[1], RegionMetaAction::Edit(_)));
let (v, action_list) = iter.next_action().await.unwrap().unwrap();
assert_eq!(2, v);
assert_eq!(1, action_list.actions.len());
let truncate = &action_list.actions[0];
assert!(matches!(
truncate,
RegionMetaAction::Truncate(RegionTruncate {
committed_sequence: 99,
..
})
));
let (v, action_list) = iter.next_action().await.unwrap().unwrap();
assert_eq!(3, v);
assert_eq!(1, action_list.actions.len());
let change = &action_list.actions[0];
assert!(matches!(
change,
RegionMetaAction::Change(RegionChange {
committed_sequence: 1,
..
})
));
// Reach end
assert!(iter.next_action().await.unwrap().is_none());
}
}

View File

@@ -74,3 +74,10 @@ pub fn build_region_edit(
compaction_time_window: None,
}
}
pub fn build_region_truncate(committed_sequence: u64) -> RegionTruncate {
RegionTruncate {
region_id: 0.into(),
committed_sequence,
}
}

View File

@@ -47,16 +47,16 @@ use crate::manifest::action::{
RawRegionMetadata, RegionChange, RegionCheckpoint, RegionMetaAction, RegionMetaActionList,
};
use crate::manifest::region::RegionManifest;
use crate::memtable::MemtableBuilderRef;
use crate::memtable::{MemtableBuilderRef, MemtableVersion};
use crate::metadata::{RegionMetaImpl, RegionMetadata, RegionMetadataRef};
pub(crate) use crate::region::writer::schedule_compaction;
use crate::region::writer::DropContext;
pub use crate::region::writer::{
AlterContext, RegionWriter, RegionWriterRef, WriterCompactRequest, WriterContext,
};
use crate::region::writer::{DropContext, TruncateContext};
use crate::schema::compat::CompatWrite;
use crate::snapshot::SnapshotImpl;
use crate::sst::AccessLayerRef;
use crate::sst::{AccessLayerRef, LevelMetas};
use crate::version::{
Version, VersionControl, VersionControlRef, VersionEdit, INIT_COMMITTED_SEQUENCE,
};
@@ -154,6 +154,10 @@ impl<S: LogStore> Region for RegionImpl<S> {
async fn compact(&self, ctx: &CompactContext) -> std::result::Result<(), Self::Error> {
self.inner.compact(ctx).await
}
async fn truncate(&self) -> Result<()> {
self.inner.truncate().await
}
}
/// Storage related config for region.
@@ -502,6 +506,27 @@ impl<S: LogStore> RegionImpl<S> {
.await?;
return Ok((None, recovered_metadata));
}
(RegionMetaAction::Truncate(t), Some(mut v)) => {
let files = v.ssts().mark_all_files_deleted();
logging::info!(
"Try to remove all SSTs on truncate, region: {}, files: {:?}",
t.region_id,
files
);
let region_metadata = v.metadata().clone();
let memtables = Arc::new(MemtableVersion::new(
memtable_builder.build(region_metadata.schema().clone()),
));
let ssts =
Arc::new(LevelMetas::new(sst_layer.clone(), file_purger.clone()));
v.reset(
v.manifest_version() + 1,
memtables,
ssts,
t.committed_sequence,
);
version = Some(v);
}
(action, None) => {
actions.push((manifest_version, action));
version = None;
@@ -769,4 +794,22 @@ impl<S: LogStore> RegionInner<S> {
})
.await
}
async fn truncate(&self) -> Result<()> {
logging::info!(
"Truncate region {}, name: {}",
self.shared.id,
self.shared.name
);
let ctx = TruncateContext {
shared: &self.shared,
wal: &self.wal,
manifest: &self.manifest,
sst_layer: &self.sst_layer,
};
self.writer.truncate(&ctx).await?;
Ok(())
}
}

View File

@@ -63,6 +63,7 @@ mod compact;
mod drop;
mod flush;
mod projection;
mod truncate;
/// Create metadata of a region with schema: (timestamp, v0).
pub fn new_metadata(region_name: &str) -> RegionMetadata {

View File

@@ -0,0 +1,242 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! Region truncate tests.
use std::sync::Arc;
use common_test_util::temp_dir::create_temp_dir;
use log_store::raft_engine::log_store::RaftEngineLogStore;
use store_api::manifest::{Manifest, MetaAction};
use store_api::storage::{FlushContext, OpenOptions, Region};
use crate::config::EngineConfig;
use crate::engine;
use crate::flush::FlushStrategyRef;
use crate::manifest::action::{RegionMetaAction, RegionMetaActionList, RegionTruncate};
use crate::region::tests::{self, FileTesterBase};
use crate::region::RegionImpl;
use crate::test_util::config_util;
use crate::test_util::flush_switch::{has_parquet_file, FlushSwitch};
const REGION_NAME: &str = "region-truncate-0";
/// Create a new region for truncate tests.
async fn create_region_for_truncate(
store_dir: &str,
flush_strategy: FlushStrategyRef,
) -> RegionImpl<RaftEngineLogStore> {
let metadata = tests::new_metadata(REGION_NAME);
let mut store_config =
config_util::new_store_config(REGION_NAME, store_dir, EngineConfig::default()).await;
store_config.flush_strategy = flush_strategy;
RegionImpl::create(metadata, store_config).await.unwrap()
}
/// Tester for truncate tests.
struct TruncateTester {
store_dir: String,
base: Option<FileTesterBase>,
}
impl TruncateTester {
async fn new(store_dir: &str, flush_strategy: FlushStrategyRef) -> TruncateTester {
let region = create_region_for_truncate(store_dir, flush_strategy).await;
TruncateTester {
store_dir: store_dir.to_string(),
base: Some(FileTesterBase::with_region(region)),
}
}
#[inline]
fn base(&self) -> &FileTesterBase {
self.base.as_ref().unwrap()
}
async fn flush(&self) {
let ctx = FlushContext::default();
self.base().region.flush(&ctx).await.unwrap();
}
async fn truncate(&self) {
self.base().region.truncate().await.unwrap();
}
async fn reopen(&mut self) {
// Close the old region.
if let Some(base) = self.base.as_ref() {
base.close().await;
}
self.base = None;
// Reopen the region.
let store_config = config_util::new_store_config(
REGION_NAME,
&self.store_dir,
EngineConfig {
max_files_in_l0: usize::MAX,
..Default::default()
},
)
.await;
let opts = OpenOptions::default();
let region = RegionImpl::open(REGION_NAME.to_string(), store_config, &opts)
.await
.unwrap()
.unwrap();
self.base = Some(FileTesterBase::with_region(region));
}
}
#[tokio::test]
async fn test_truncate_basic() {
let dir = create_temp_dir("truncate-basic");
common_telemetry::init_default_ut_logging();
let store_dir = dir.path().to_str().unwrap();
let flush_switch = Arc::new(FlushSwitch::default());
let tester = TruncateTester::new(store_dir, flush_switch.clone()).await;
let data = [
(1000, Some("1000".to_string())),
(1001, Some("1001".to_string())),
(1002, Some("1002".to_string())),
(1003, Some("1003".to_string())),
];
// Data in Memtable
tester.base().put(&data).await;
let res = tester.base().full_scan().await;
assert_eq!(4, res.len());
// Truncate region.
tester.truncate().await;
let res = tester.base().full_scan().await;
assert_eq!(0, res.len());
}
#[tokio::test]
async fn test_put_data_after_truncate() {
let dir = create_temp_dir("put_data_after_truncate");
common_telemetry::init_default_ut_logging();
let store_dir = dir.path().to_str().unwrap();
let sst_dir = format!("{}/{}", store_dir, engine::region_sst_dir("", REGION_NAME));
let flush_switch = Arc::new(FlushSwitch::default());
let tester = TruncateTester::new(store_dir, flush_switch.clone()).await;
let data = [
(1000, Some("1000".to_string())),
(1001, Some("1001".to_string())),
(1002, None),
(1003, Some("1003".to_string())),
];
tester.base().put(&data).await;
// Manually trigger flush.
tester.flush().await;
assert!(has_parquet_file(&sst_dir));
let data = [
(1002, Some("1002".to_string())),
(1004, Some("1004".to_string())),
(1005, Some("1005".to_string())),
];
tester.base().put(&data).await;
// Truncate region.
tester.truncate().await;
let res = tester.base().full_scan().await;
assert_eq!(0, res.len());
let new_data = [
(1010, Some("0".to_string())),
(1011, Some("1".to_string())),
(1012, Some("2".to_string())),
(1013, Some("3".to_string())),
];
tester.base().put(&new_data).await;
let res = tester.base().full_scan().await;
assert_eq!(new_data, res.as_slice());
}
#[tokio::test]
async fn test_truncate_reopen() {
let dir = create_temp_dir("put_data_after_truncate");
common_telemetry::init_default_ut_logging();
let store_dir = dir.path().to_str().unwrap();
let flush_switch = Arc::new(FlushSwitch::default());
let mut tester = TruncateTester::new(store_dir, flush_switch.clone()).await;
let data = [
(1000, Some("1000".to_string())),
(1001, Some("1001".to_string())),
(1002, None),
(1003, Some("1003".to_string())),
];
tester.base().put(&data).await;
// Manually trigger flush.
tester.flush().await;
let data = [
(1002, Some("1002".to_string())),
(1004, Some("1004".to_string())),
(1005, Some("1005".to_string())),
];
tester.base().put(&data).await;
let manifest = &tester.base().region.inner.manifest;
let manifest_version = tester
.base()
.region
.version_control()
.current_manifest_version();
let committed_sequence = tester.base().committed_sequence();
let mut action_list =
RegionMetaActionList::with_action(RegionMetaAction::Truncate(RegionTruncate {
region_id: 0.into(),
committed_sequence,
}));
// Persist the meta action.
let prev_version = manifest_version;
action_list.set_prev_version(prev_version);
assert!(manifest.update(action_list).await.is_ok());
// Reopen and put data.
tester.reopen().await;
let res = tester.base().full_scan().await;
assert_eq!(0, res.len());
let new_data = [
(0, Some("0".to_string())),
(1, Some("1".to_string())),
(2, Some("2".to_string())),
(3, Some("3".to_string())),
];
tester.base().put(&new_data).await;
let res = tester.base().full_scan().await;
assert_eq!(new_data, res.as_slice());
}

View File

@@ -35,9 +35,9 @@ use crate::flush::{
};
use crate::manifest::action::{
RawRegionMetadata, RegionChange, RegionEdit, RegionMetaAction, RegionMetaActionList,
RegionRemove,
RegionRemove, RegionTruncate,
};
use crate::memtable::{Inserter, MemtableBuilderRef, MemtableId, MemtableRef};
use crate::memtable::{Inserter, MemtableBuilderRef, MemtableId, MemtableRef, MemtableVersion};
use crate::metadata::RegionMetadataRef;
use crate::metrics::{FLUSH_REASON, FLUSH_REQUESTS_TOTAL, PREPROCESS_ELAPSED};
use crate::proto::wal::WalHeader;
@@ -45,7 +45,7 @@ use crate::region::{
CompactContext, RecoveredMetadata, RecoveredMetadataMap, RegionManifest, SharedDataRef,
};
use crate::schema::compat::CompatWrite;
use crate::sst::AccessLayerRef;
use crate::sst::{AccessLayerRef, LevelMetas};
use crate::version::{VersionControl, VersionControlRef, VersionEdit};
use crate::wal::Wal;
use crate::write_batch::WriteBatch;
@@ -397,6 +397,55 @@ where
Ok(())
}
pub async fn truncate(&self, ctx: &TruncateContext<'_, S>) -> Result<()> {
// Acquires the write lock.
let mut inner = self.inner.lock().await;
ensure!(!inner.is_closed(), error::ClosedRegionSnafu);
if let Some(handle) = inner.flush_handle.take() {
handle.wait().await?;
}
let version_control = ctx.version_control();
let _lock = self.version_mutex.lock().await;
let committed_sequence = version_control.committed_sequence();
// Add `RegionMetaAction::Truncate` to recover from manifest in case of failure.
let mut action_list =
RegionMetaActionList::with_action(RegionMetaAction::Truncate(RegionTruncate {
region_id: ctx.shared.id,
committed_sequence,
}));
// Persist the meta action.
let current_version = version_control.current();
let manifest_version = version_control.current_manifest_version();
let prev_version = manifest_version;
action_list.set_prev_version(prev_version);
ctx.manifest.update(action_list).await?;
// Mark all data obsolete
ctx.wal.obsolete(committed_sequence).await?;
// Mark all SSTs deleted
let files = current_version.ssts().mark_all_files_deleted();
logging::info!(
"Try to remove all SSTs, region: {}, files: {:?}",
ctx.shared.id(),
files
);
// Reset version
let memtables = Arc::new(MemtableVersion::new(inner.alloc_memtable(version_control)));
let ssts = Arc::new(LevelMetas::new(
ctx.sst_layer.clone(),
current_version.ssts().file_purger(),
));
version_control.reset_version(manifest_version + 1, memtables, ssts);
Ok(())
}
}
// Methods for tests.
@@ -468,6 +517,20 @@ impl<'a, S: LogStore> DropContext<'a, S> {
}
}
pub struct TruncateContext<'a, S: LogStore> {
pub shared: &'a SharedDataRef,
pub wal: &'a Wal<S>,
pub manifest: &'a RegionManifest,
pub sst_layer: &'a AccessLayerRef,
}
impl<'a, S: LogStore> TruncateContext<'a, S> {
#[inline]
fn version_control(&self) -> &VersionControlRef {
&self.shared.version_control
}
}
#[derive(Debug)]
struct WriterInner {
memtable_builder: MemtableBuilderRef,

View File

@@ -150,6 +150,10 @@ impl LevelMetas {
pub fn levels(&self) -> &[LevelMeta] {
&self.levels
}
pub fn file_purger(&self) -> FilePurgerRef {
self.file_purger.clone()
}
}
/// Metadata of files in same SST level.

View File

@@ -125,6 +125,17 @@ impl VersionControl {
version_to_update.apply_metadata(metadata, manifest_version);
version_to_update.commit();
}
pub fn reset_version(
&self,
manifest_version: ManifestVersion,
memtables: MemtableVersionRef,
ssts: LevelMetasRef,
) {
let mut version_to_update = self.version.lock();
version_to_update.reset(manifest_version, memtables, ssts, 0);
version_to_update.commit();
}
}
#[derive(Debug)]
@@ -307,6 +318,19 @@ impl Version {
pub fn manifest_version(&self) -> ManifestVersion {
self.manifest_version
}
pub fn reset(
&mut self,
manifest_version: ManifestVersion,
memtables: MemtableVersionRef,
ssts: LevelMetasRef,
flushed_sequence: SequenceNumber,
) {
self.memtables = memtables;
self.ssts = ssts;
self.manifest_version = manifest_version;
self.flushed_sequence = flushed_sequence;
}
}
#[cfg(test)]

View File

@@ -88,6 +88,8 @@ pub trait Region: Send + Sync + Clone + std::fmt::Debug + 'static {
async fn flush(&self, ctx: &FlushContext) -> Result<(), Self::Error>;
async fn compact(&self, ctx: &CompactContext) -> Result<(), Self::Error>;
async fn truncate(&self) -> Result<(), Self::Error>;
}
#[derive(Default, Debug)]

View File

@@ -23,6 +23,7 @@ use crate::error::{self, Result};
use crate::metadata::TableId;
use crate::requests::{
AlterTableRequest, CloseTableRequest, CreateTableRequest, DropTableRequest, OpenTableRequest,
TruncateTableRequest,
};
use crate::TableRef;
pub mod manager;
@@ -132,6 +133,12 @@ pub trait TableEngine: Send + Sync {
/// Close the engine.
async fn close(&self) -> Result<()>;
async fn truncate_table(
&self,
_ctx: &EngineContext,
_request: TruncateTableRequest,
) -> Result<bool>;
}
pub type TableEngineRef = Arc<dyn TableEngine>;

View File

@@ -130,6 +130,13 @@ pub trait Table: Send + Sync {
}
.fail()?
}
async fn truncate(&self) -> Result<()> {
UnsupportedSnafu {
operation: "TRUNCATE",
}
.fail()?
}
}
pub type TableRef = Arc<dyn Table>;

View File

@@ -21,7 +21,9 @@ use tokio::sync::Mutex;
use crate::engine::{EngineContext, TableEngine, TableEngineProcedure};
use crate::metadata::TableId;
use crate::requests::{AlterTableRequest, CreateTableRequest, DropTableRequest, OpenTableRequest};
use crate::requests::{
AlterTableRequest, CreateTableRequest, DropTableRequest, OpenTableRequest, TruncateTableRequest,
};
use crate::test_util::EmptyTable;
use crate::{Result, TableRef};
@@ -103,6 +105,14 @@ impl TableEngine for MockTableEngine {
async fn close(&self) -> Result<()> {
Ok(())
}
async fn truncate_table(
&self,
_ctx: &EngineContext,
_request: TruncateTableRequest,
) -> Result<bool> {
Ok(true)
}
}
impl TableEngineProcedure for MockTableEngine {