feat: implement truncate region for mito2 (#2335)

* feat: implement truncate region for mito2.

* chore: add license header and fix typos

* Update src/mito2/src/worker/handle_truncate.rs

Co-authored-by: Yingwen <realevenyag@gmail.com>

* cr

* chore: consider the flush task being executed before truncating the region.

* test

* feat: check flush and compaction tasks

* chore: remove useless changes

* Update src/mito2/src/manifest/action.rs

Co-authored-by: Yingwen <realevenyag@gmail.com>

* Update src/mito2/src/worker/handle_flush.rs

Co-authored-by: Yingwen <realevenyag@gmail.com>

* chore: CR, consider sequence number

* test: use EventListener to test the flush task during truncate

* fix: fix listener error

* Update src/mito2/src/engine/truncate_test.rs

Co-authored-by: Yingwen <realevenyag@gmail.com>

* chore: cr

* fix: remove set None

* Update src/mito2/src/region/version.rs

Co-authored-by: dennis zhuang <killme2008@gmail.com>

* Update src/mito2/src/worker/handle_flush.rs

Co-authored-by: dennis zhuang <killme2008@gmail.com>

* Update src/mito2/src/worker/handle_truncate.rs

Co-authored-by: dennis zhuang <killme2008@gmail.com>

* doc: add some doc for FlushTruncateListener and RegionTruncate

---------

Co-authored-by: Yingwen <realevenyag@gmail.com>
Co-authored-by: dennis zhuang <killme2008@gmail.com>
This commit is contained in:
Vanish
2023-09-15 21:20:01 +08:00
committed by GitHub
parent 6aec30a1a8
commit 0bd6b9bb39
16 changed files with 623 additions and 14 deletions

View File

@@ -210,7 +210,8 @@ impl RegionServerInner {
| RegionRequest::Delete(_)
| RegionRequest::Alter(_)
| RegionRequest::Flush(_)
| RegionRequest::Compact(_) => RegionChange::None,
| RegionRequest::Compact(_)
| RegionRequest::Truncate(_) => RegionChange::None,
};
let engine = match &region_change {

View File

@@ -34,6 +34,8 @@ pub(crate) mod listener;
mod open_test;
#[cfg(test)]
mod projection_test;
#[cfg(test)]
mod truncate_test;
use std::sync::Arc;

View File

@@ -16,17 +16,22 @@
use std::sync::Arc;
use async_trait::async_trait;
use common_telemetry::info;
use store_api::storage::RegionId;
use tokio::sync::Notify;
/// Mito engine background event listener.
#[async_trait]
pub trait EventListener: Send + Sync {
/// Notifies the listener that a region is flushed successfully.
fn on_flush_success(&self, region_id: RegionId);
/// Notifies the listener that the engine is stalled.
fn on_write_stall(&self);
/// Notifies the listener that the region starts to do flush.
async fn on_flush_begin(&self, region_id: RegionId);
}
pub type EventListenerRef = Arc<dyn EventListener>;
@@ -50,6 +55,7 @@ impl FlushListener {
}
}
#[async_trait]
impl EventListener for FlushListener {
fn on_flush_success(&self, region_id: RegionId) {
info!("Region {} flush successfully", region_id);
@@ -58,6 +64,8 @@ impl EventListener for FlushListener {
}
fn on_write_stall(&self) {}
async fn on_flush_begin(&self, _region_id: RegionId) {}
}
/// Listener to watch stall events.
@@ -79,6 +87,7 @@ impl StallListener {
}
}
#[async_trait]
impl EventListener for StallListener {
fn on_flush_success(&self, _region_id: RegionId) {}
@@ -87,4 +96,57 @@ impl EventListener for StallListener {
self.notify.notify_one();
}
async fn on_flush_begin(&self, _region_id: RegionId) {}
}
/// Listener to watch begin flush events.
///
/// Crate a background thread to execute flush region, and the main thread calls `wait_truncate()`
/// to block and wait for `on_flush_region()`.
/// When the background thread calls `on_flush_begin()`, the main thread is notified to truncate
/// region, and background thread thread blocks and waits for `notify_flush()` to continue flushing.
pub struct FlushTruncateListener {
/// Notify flush operation.
notify_flush: Notify,
/// Notify truncate operation.
notify_truncate: Notify,
}
impl FlushTruncateListener {
/// Creates a new listener.
pub fn new() -> FlushTruncateListener {
FlushTruncateListener {
notify_flush: Notify::new(),
notify_truncate: Notify::new(),
}
}
/// Notify flush region to proceed.
pub fn notify_flush(&self) {
self.notify_flush.notify_one();
}
/// Wait for a truncate event.
pub async fn wait_truncate(&self) {
self.notify_truncate.notified().await;
}
}
#[async_trait]
impl EventListener for FlushTruncateListener {
fn on_flush_success(&self, _region_id: RegionId) {}
fn on_write_stall(&self) {}
/// Calling this function will block the thread!
/// Notify the listener to perform a truncate region and block the flush region job.
async fn on_flush_begin(&self, region_id: RegionId) {
info!(
"Region {} begin do flush, notify region to truncate",
region_id
);
self.notify_truncate.notify_one();
self.notify_flush.notified().await;
}
}

View File

@@ -0,0 +1,354 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashMap;
use std::sync::Arc;
use api::v1::Rows;
use common_recordbatch::RecordBatches;
use common_telemetry::{info, init_default_ut_logging};
use store_api::region_engine::RegionEngine;
use store_api::region_request::{
RegionFlushRequest, RegionOpenRequest, RegionRequest, RegionTruncateRequest,
};
use store_api::storage::RegionId;
use super::ScanRequest;
use crate::config::MitoConfig;
use crate::engine::listener::FlushTruncateListener;
use crate::test_util::{
build_rows, put_rows, rows_schema, CreateRequestBuilder, MockWriteBufferManager, TestEnv,
};
#[tokio::test]
async fn test_engine_truncate_region_basic() {
let mut env = TestEnv::with_prefix("truncate-basic");
let engine = env.create_engine(MitoConfig::default()).await;
// Create the region.
let region_id = RegionId::new(1, 1);
let request = CreateRequestBuilder::new().build();
let column_schemas = rows_schema(&request);
engine
.handle_request(region_id, RegionRequest::Create(request))
.await
.unwrap();
// Put data to the region.
let rows = Rows {
schema: column_schemas,
rows: build_rows(0, 3),
};
put_rows(&engine, region_id, rows).await;
// Scan the region.
let request = ScanRequest::default();
let stream = engine.handle_query(region_id, request).await.unwrap();
let batches = RecordBatches::try_collect(stream).await.unwrap();
let expected = "\
+-------+---------+---------------------+
| tag_0 | field_0 | ts |
+-------+---------+---------------------+
| 0 | 0.0 | 1970-01-01T00:00:00 |
| 1 | 1.0 | 1970-01-01T00:00:01 |
| 2 | 2.0 | 1970-01-01T00:00:02 |
+-------+---------+---------------------+";
assert_eq!(expected, batches.pretty_print().unwrap());
// Truncate the region.
engine
.handle_request(region_id, RegionRequest::Truncate(RegionTruncateRequest {}))
.await
.unwrap();
// Scan the region.
let request = ScanRequest::default();
let stream = engine.handle_query(region_id, request).await.unwrap();
let batches = RecordBatches::try_collect(stream).await.unwrap();
let expected = "++\n++";
assert_eq!(expected, batches.pretty_print().unwrap());
}
#[tokio::test]
async fn test_engine_put_data_after_truncate() {
let mut env = TestEnv::with_prefix("truncate-put");
let engine = env.create_engine(MitoConfig::default()).await;
// Create the region.
let region_id = RegionId::new(1, 1);
let request = CreateRequestBuilder::new().build();
let column_schemas = rows_schema(&request);
engine
.handle_request(region_id, RegionRequest::Create(request))
.await
.unwrap();
// Put data to the region.
let rows = Rows {
schema: column_schemas.clone(),
rows: build_rows(0, 3),
};
put_rows(&engine, region_id, rows).await;
// Scan the region
let request = ScanRequest::default();
let stream = engine.handle_query(region_id, request).await.unwrap();
let batches = RecordBatches::try_collect(stream).await.unwrap();
let expected = "\
+-------+---------+---------------------+
| tag_0 | field_0 | ts |
+-------+---------+---------------------+
| 0 | 0.0 | 1970-01-01T00:00:00 |
| 1 | 1.0 | 1970-01-01T00:00:01 |
| 2 | 2.0 | 1970-01-01T00:00:02 |
+-------+---------+---------------------+";
assert_eq!(expected, batches.pretty_print().unwrap());
// Truncate the region.
engine
.handle_request(region_id, RegionRequest::Truncate(RegionTruncateRequest {}))
.await
.unwrap();
// Put data to the region again.
let rows = Rows {
schema: column_schemas,
rows: build_rows(5, 8),
};
put_rows(&engine, region_id, rows).await;
// Scan the region.
let request = ScanRequest::default();
let stream = engine.handle_query(region_id, request).await.unwrap();
let batches = RecordBatches::try_collect(stream).await.unwrap();
let expected = "\
+-------+---------+---------------------+
| tag_0 | field_0 | ts |
+-------+---------+---------------------+
| 5 | 5.0 | 1970-01-01T00:00:05 |
| 6 | 6.0 | 1970-01-01T00:00:06 |
| 7 | 7.0 | 1970-01-01T00:00:07 |
+-------+---------+---------------------+";
assert_eq!(expected, batches.pretty_print().unwrap());
}
#[tokio::test]
async fn test_engine_truncate_after_flush() {
let mut env = TestEnv::with_prefix("truncate-flush");
let engine = env.create_engine(MitoConfig::default()).await;
// Create the region.
let region_id = RegionId::new(1, 1);
let request = CreateRequestBuilder::new().build();
let column_schemas = rows_schema(&request);
engine
.handle_request(region_id, RegionRequest::Create(request))
.await
.unwrap();
// Put data to the region.
let rows = Rows {
schema: column_schemas.clone(),
rows: build_rows(0, 3),
};
put_rows(&engine, region_id, rows).await;
// Flush the region.
engine
.handle_request(region_id, RegionRequest::Flush(RegionFlushRequest {}))
.await
.unwrap();
let request = ScanRequest::default();
let scanner = engine.scanner(region_id, request.clone()).unwrap();
assert_eq!(1, scanner.num_files());
// Truncate the region.
engine
.handle_request(region_id, RegionRequest::Truncate(RegionTruncateRequest {}))
.await
.unwrap();
// Put data to the region.
let rows = Rows {
schema: column_schemas,
rows: build_rows(5, 8),
};
put_rows(&engine, region_id, rows).await;
// Scan the region.
let scanner = engine.scanner(region_id, request).unwrap();
assert_eq!(0, scanner.num_files());
let stream = scanner.scan().await.unwrap();
let batches = RecordBatches::try_collect(stream).await.unwrap();
let expected = "\
+-------+---------+---------------------+
| tag_0 | field_0 | ts |
+-------+---------+---------------------+
| 5 | 5.0 | 1970-01-01T00:00:05 |
| 6 | 6.0 | 1970-01-01T00:00:06 |
| 7 | 7.0 | 1970-01-01T00:00:07 |
+-------+---------+---------------------+";
assert_eq!(expected, batches.pretty_print().unwrap());
}
#[tokio::test]
async fn test_engine_truncate_reopen() {
let mut env = TestEnv::with_prefix("truncate-reopen");
let engine = env.create_engine(MitoConfig::default()).await;
// Create the region.
let region_id = RegionId::new(1, 1);
let request = CreateRequestBuilder::new().build();
let region_dir = request.region_dir.clone();
let column_schemas = rows_schema(&request);
engine
.handle_request(region_id, RegionRequest::Create(request))
.await
.unwrap();
// Put data to the region.
let rows = Rows {
schema: column_schemas.clone(),
rows: build_rows(0, 3),
};
put_rows(&engine, region_id, rows).await;
let region = engine.get_region(region_id).unwrap();
let last_entry_id = region.version_control.current().last_entry_id;
// Truncate the region
engine
.handle_request(region_id, RegionRequest::Truncate(RegionTruncateRequest {}))
.await
.unwrap();
// Reopen the region again.
let engine = env.reopen_engine(engine, MitoConfig::default()).await;
engine
.handle_request(
region_id,
RegionRequest::Open(RegionOpenRequest {
engine: String::new(),
region_dir,
options: HashMap::default(),
}),
)
.await
.unwrap();
let region = engine.get_region(region_id).unwrap();
assert_eq!(last_entry_id, region.version().flushed_entry_id);
// Scan the region.
let request = ScanRequest::default();
let stream = engine.handle_query(region_id, request).await.unwrap();
let batches = RecordBatches::try_collect(stream).await.unwrap();
let expected = "++\n++";
assert_eq!(expected, batches.pretty_print().unwrap());
}
#[tokio::test]
async fn test_engine_truncate_during_flush() {
init_default_ut_logging();
let mut env = TestEnv::with_prefix("truncate-during-flush");
let write_buffer_manager = Arc::new(MockWriteBufferManager::default());
let listener = Arc::new(FlushTruncateListener::new());
let engine = env
.create_engine_with(
MitoConfig::default(),
write_buffer_manager.clone(),
Some(listener.clone()),
)
.await;
// Create the region.
let region_id = RegionId::new(1, 1);
let request = CreateRequestBuilder::new().build();
let region_dir = request.region_dir.clone();
let column_schemas = rows_schema(&request);
engine
.handle_request(region_id, RegionRequest::Create(request))
.await
.unwrap();
// Put data to the region.
let rows = Rows {
schema: column_schemas.clone(),
rows: build_rows(0, 3),
};
put_rows(&engine, region_id, rows).await;
let region = engine.get_region(region_id).unwrap();
let version_data = region.version_control.current();
let entry_id = version_data.last_entry_id;
let sequence = version_data.committed_sequence;
// Flush reigon.
let engine_cloned = engine.clone();
let flush_task = tokio::spawn(async move {
info!("do flush task!!!!");
engine_cloned
.handle_request(region_id, RegionRequest::Flush(RegionFlushRequest {}))
.await
});
// Wait truncate before flush memtable.
listener.wait_truncate().await;
// Truncate the region.
engine
.handle_request(region_id, RegionRequest::Truncate(RegionTruncateRequest {}))
.await
.unwrap();
// Notify region to continue flushing.
listener.notify_flush();
// Wait handle flushed finish.
let _err = flush_task.await.unwrap().unwrap_err();
// Check sequences and entry id.
let version_data = region.version_control.current();
let truncated_entry_id = version_data.version.truncated_entry_id;
let truncated_sequence = version_data.version.flushed_sequence;
let request = ScanRequest::default();
let scanner = engine.scanner(region_id, request.clone()).unwrap();
assert_eq!(0, scanner.num_files());
assert_eq!(Some(entry_id), truncated_entry_id);
assert_eq!(sequence, truncated_sequence);
// Reopen the engine.
let engine = env.reopen_engine(engine, MitoConfig::default()).await;
engine
.handle_request(
region_id,
RegionRequest::Open(RegionOpenRequest {
engine: String::new(),
region_dir,
options: HashMap::default(),
}),
)
.await
.unwrap();
let region = engine.get_region(region_id).unwrap();
let current_version = region.version_control.current().version;
assert_eq!(current_version.truncated_entry_id, Some(entry_id));
}

View File

@@ -420,6 +420,12 @@ pub enum Error {
location: Location,
},
#[snafu(display("Region {} is truncating, location: {}", region_id, location))]
RegionTruncating {
region_id: RegionId,
location: Location,
},
#[snafu(display(
"Engine write buffer is full, rejecting write requests of region {}, location: {}",
region_id,
@@ -540,6 +546,7 @@ impl ErrorExt for Error {
FlushRegion { source, .. } => source.status_code(),
RegionDropped { .. } => StatusCode::Cancelled,
RegionClosed { .. } => StatusCode::Cancelled,
RegionTruncating { .. } => StatusCode::Cancelled,
BuildCompactionPredicate { .. } => StatusCode::Internal,
RejectWrite { .. } => StatusCode::StorageUnavailable,
CompactRegion { source, .. } => source.status_code(),

View File

@@ -25,7 +25,9 @@ use store_api::storage::RegionId;
use tokio::sync::mpsc;
use crate::access_layer::AccessLayerRef;
use crate::error::{Error, FlushRegionSnafu, RegionClosedSnafu, RegionDroppedSnafu, Result};
use crate::error::{
Error, FlushRegionSnafu, RegionClosedSnafu, RegionDroppedSnafu, RegionTruncatingSnafu, Result,
};
use crate::memtable::MemtableBuilderRef;
use crate::read::Source;
use crate::region::version::{VersionControlData, VersionRef};
@@ -38,6 +40,7 @@ use crate::schedule::scheduler::{Job, SchedulerRef};
use crate::sst::file::{FileId, FileMeta};
use crate::sst::file_purger::FilePurgerRef;
use crate::sst::parquet::WriteOptions;
use crate::worker::WorkerListener;
/// Global write buffer (memtable) manager.
///
@@ -185,6 +188,7 @@ pub(crate) struct RegionFlushTask {
pub(crate) access_layer: AccessLayerRef,
pub(crate) memtable_builder: MemtableBuilderRef,
pub(crate) file_purger: FilePurgerRef,
pub(crate) listener: WorkerListener,
}
impl RegionFlushTask {
@@ -226,6 +230,7 @@ impl RegionFlushTask {
/// Runs the flush task.
async fn do_flush(&mut self, version_data: VersionControlData) {
self.listener.on_flush_begin(self.region_id).await;
let worker_request = match self.flush_memtables(&version_data.version).await {
Ok(file_metas) => {
let memtables_to_remove = version_data
@@ -450,24 +455,33 @@ impl FlushScheduler {
/// Notifies the scheduler that the region is dropped.
pub(crate) fn on_region_dropped(&mut self, region_id: RegionId) {
// Remove this region.
let Some(flush_status) = self.region_status.remove(&region_id) else {
return;
};
// Notifies all pending tasks.
flush_status.on_failure(Arc::new(RegionDroppedSnafu { region_id }.build()));
self.remove_region_on_failure(
region_id,
Arc::new(RegionDroppedSnafu { region_id }.build()),
);
}
/// Notifies the scheduler that the region is closed.
pub(crate) fn on_region_closed(&mut self, region_id: RegionId) {
self.remove_region_on_failure(region_id, Arc::new(RegionClosedSnafu { region_id }.build()));
}
/// Notifies the scheduler that the region is truncating.
pub(crate) fn on_region_truncating(&mut self, region_id: RegionId) {
self.remove_region_on_failure(
region_id,
Arc::new(RegionTruncatingSnafu { region_id }.build()),
);
}
pub(crate) fn remove_region_on_failure(&mut self, region_id: RegionId, err: Arc<Error>) {
// Remove this region.
let Some(flush_status) = self.region_status.remove(&region_id) else {
return;
};
// Notifies all pending tasks.
flush_status.on_failure(Arc::new(RegionClosedSnafu { region_id }.build()));
flush_status.on_failure(err);
}
/// Add ddl request to pending queue.

View File

@@ -35,6 +35,8 @@ pub enum RegionMetaAction {
Edit(RegionEdit),
/// Remove the region.
Remove(RegionRemove),
/// Truncate the region.
Truncate(RegionTruncate),
}
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)]
@@ -57,6 +59,16 @@ pub struct RegionRemove {
pub region_id: RegionId,
}
/// Last data truncated in the region.
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)]
pub struct RegionTruncate {
pub region_id: RegionId,
/// Last WAL entry id of truncated data.
pub truncated_entry_id: EntryId,
// Last sequence number of truncated data.
pub truncated_sequence: SequenceNumber,
}
/// The region manifest data.
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)]
pub struct RegionManifest {
@@ -70,6 +82,8 @@ pub struct RegionManifest {
pub flushed_sequence: SequenceNumber,
/// Current manifest version.
pub manifest_version: ManifestVersion,
/// Last WAL entry id of truncated data.
pub truncated_entry_id: Option<EntryId>,
}
#[derive(Debug, Default)]
@@ -79,6 +93,7 @@ pub struct RegionManifestBuilder {
flushed_entry_id: EntryId,
flushed_sequence: SequenceNumber,
manifest_version: ManifestVersion,
truncated_entry_id: Option<EntryId>,
}
impl RegionManifestBuilder {
@@ -91,6 +106,7 @@ impl RegionManifestBuilder {
flushed_entry_id: s.flushed_entry_id,
manifest_version: s.manifest_version,
flushed_sequence: s.flushed_sequence,
truncated_entry_id: s.truncated_entry_id,
}
} else {
Default::default()
@@ -118,6 +134,14 @@ impl RegionManifestBuilder {
}
}
pub fn apply_truncate(&mut self, manifest_version: ManifestVersion, truncate: RegionTruncate) {
self.manifest_version = manifest_version;
self.flushed_entry_id = truncate.truncated_entry_id;
self.flushed_sequence = truncate.truncated_sequence;
self.truncated_entry_id = Some(truncate.truncated_entry_id);
self.files.clear();
}
/// Check if the builder keeps a [RegionMetadata](crate::metadata::RegionMetadata).
pub fn contains_metadata(&self) -> bool {
self.metadata.is_some()
@@ -131,6 +155,7 @@ impl RegionManifestBuilder {
flushed_entry_id: self.flushed_entry_id,
flushed_sequence: self.flushed_sequence,
manifest_version: self.manifest_version,
truncated_entry_id: self.truncated_entry_id,
})
}
}

View File

@@ -279,6 +279,9 @@ impl RegionManifestManagerInner {
options.manifest_dir, action
);
}
RegionMetaAction::Truncate(action) => {
manifest_builder.apply_truncate(manifest_version, action);
}
}
}
}
@@ -329,6 +332,9 @@ impl RegionManifestManagerInner {
self.manifest.metadata.region_id, action
);
}
RegionMetaAction::Truncate(action) => {
manifest_builder.apply_truncate(version, action);
}
}
}
let new_manifest = manifest_builder.try_build()?;
@@ -400,6 +406,9 @@ impl RegionManifestManagerInner {
self.manifest.metadata.region_id, action
);
}
RegionMetaAction::Truncate(action) => {
manifest_builder.apply_truncate(version, action);
}
}
}
last_version = version;

View File

@@ -150,7 +150,7 @@ async fn manager_with_checkpoint_distance_1() {
.await
.unwrap();
let raw_json = std::str::from_utf8(&raw_bytes).unwrap();
let expected_json = "{\"size\":790,\"version\":9,\"checksum\":null,\"extend_metadata\":{}}";
let expected_json = "{\"size\":816,\"version\":9,\"checksum\":null,\"extend_metadata\":{}}";
assert_eq!(expected_json, raw_json);
// reopen the manager

View File

@@ -156,6 +156,7 @@ impl RegionOpener {
.add_files(file_purger.clone(), manifest.files.values().cloned())
.flushed_entry_id(manifest.flushed_entry_id)
.flushed_sequence(manifest.flushed_sequence)
.truncated_entry_id(manifest.truncated_entry_id)
.build();
let flushed_entry_id = version.flushed_entry_id;
let version_control = Arc::new(VersionControl::new(version));

View File

@@ -138,6 +138,29 @@ impl VersionControl {
let mut version_data = self.data.write().unwrap();
version_data.version = new_version;
}
/// Truncate current version.
pub(crate) fn truncate(
&self,
truncated_entry_id: EntryId,
truncated_sequence: SequenceNumber,
memtable_builder: &MemtableBuilderRef,
) {
let version = self.current().version;
let new_mutable = memtable_builder.build(&version.metadata);
let new_version = Arc::new(
VersionBuilder::new(version.metadata.clone(), new_mutable)
.flushed_entry_id(truncated_entry_id)
.flushed_sequence(truncated_sequence)
.truncated_entry_id(Some(truncated_entry_id))
.build(),
);
let mut version_data = self.data.write().unwrap();
version_data.version.ssts.mark_all_deleted();
version_data.version = new_version;
}
}
pub(crate) type VersionControlRef = Arc<VersionControl>;
@@ -177,6 +200,10 @@ pub(crate) struct Version {
pub(crate) flushed_entry_id: EntryId,
/// Inclusive max sequence of flushed data.
pub(crate) flushed_sequence: SequenceNumber,
/// Latest entry id during the truncating table.
///
/// Used to check if it is a flush task during the truncating table.
pub(crate) truncated_entry_id: Option<EntryId>,
// TODO(yingwen): RegionOptions.
}
@@ -189,6 +216,7 @@ pub(crate) struct VersionBuilder {
ssts: SstVersionRef,
flushed_entry_id: EntryId,
flushed_sequence: SequenceNumber,
truncated_entry_id: Option<EntryId>,
}
impl VersionBuilder {
@@ -200,6 +228,7 @@ impl VersionBuilder {
ssts: Arc::new(SstVersion::new()),
flushed_entry_id: 0,
flushed_sequence: 0,
truncated_entry_id: None,
}
}
@@ -211,6 +240,7 @@ impl VersionBuilder {
ssts: version.ssts.clone(),
flushed_entry_id: version.flushed_entry_id,
flushed_sequence: version.flushed_sequence,
truncated_entry_id: version.truncated_entry_id,
}
}
@@ -238,6 +268,12 @@ impl VersionBuilder {
self
}
/// Sets truncated entty id.
pub(crate) fn truncated_entry_id(mut self, entry_id: Option<EntryId>) -> Self {
self.truncated_entry_id = entry_id;
self
}
/// Apply edit to the builder.
pub(crate) fn apply_edit(mut self, edit: RegionEdit, file_purger: FilePurgerRef) -> Self {
if let Some(entry_id) = edit.flushed_entry_id {
@@ -287,6 +323,7 @@ impl VersionBuilder {
ssts: self.ssts,
flushed_entry_id: self.flushed_entry_id,
flushed_sequence: self.flushed_sequence,
truncated_entry_id: self.truncated_entry_id,
}
}
}

View File

@@ -35,7 +35,7 @@ use snafu::{ensure, OptionExt, ResultExt};
use store_api::metadata::{ColumnMetadata, RegionMetadata};
use store_api::region_request::{
RegionAlterRequest, RegionCloseRequest, RegionCompactRequest, RegionCreateRequest,
RegionDropRequest, RegionFlushRequest, RegionOpenRequest, RegionRequest,
RegionDropRequest, RegionFlushRequest, RegionOpenRequest, RegionRequest, RegionTruncateRequest,
};
use store_api::storage::{CompactionStrategy, RegionId, SequenceNumber};
use tokio::sync::oneshot::{self, Receiver, Sender};
@@ -552,6 +552,11 @@ impl WorkerRequest {
sender: sender.into(),
request: DdlRequest::Compact(v),
}),
RegionRequest::Truncate(v) => WorkerRequest::Ddl(SenderDdlRequest {
region_id,
sender: sender.into(),
request: DdlRequest::Truncate(v),
}),
};
Ok((worker_request, receiver))
@@ -568,6 +573,7 @@ pub(crate) enum DdlRequest {
Alter(RegionAlterRequest),
Flush(RegionFlushRequest),
Compact(RegionCompactRequest),
Truncate(RegionTruncateRequest),
}
/// Sender and Ddl request.

View File

@@ -21,6 +21,7 @@ mod handle_create;
mod handle_drop;
mod handle_flush;
mod handle_open;
mod handle_truncate;
mod handle_write;
use std::collections::hash_map::DefaultHasher;
@@ -503,6 +504,7 @@ impl<S: LogStore> RegionWorkerLoop<S> {
self.handle_compaction_request(ddl.region_id, ddl.sender);
continue;
}
DdlRequest::Truncate(_) => self.handle_truncate_request(ddl.region_id).await,
};
ddl.sender.send(res);
@@ -540,7 +542,7 @@ impl<S> RegionWorkerLoop<S> {
}
/// Wrapper that only calls event listener in tests.
#[derive(Default)]
#[derive(Default, Clone)]
pub(crate) struct WorkerListener {
#[cfg(test)]
listener: Option<crate::engine::listener::EventListenerRef>,
@@ -571,6 +573,15 @@ impl WorkerListener {
listener.on_write_stall();
}
}
pub(crate) async fn on_flush_begin(&self, region_id: RegionId) {
#[cfg(test)]
if let Some(listener) = &self.listener {
listener.on_flush_begin(region_id).await;
}
// Avoid compiler warning.
let _ = region_id;
}
}
#[cfg(test)]

View File

@@ -19,7 +19,7 @@ use common_time::util::current_time_millis;
use store_api::logstore::LogStore;
use store_api::storage::RegionId;
use crate::error::Result;
use crate::error::{RegionTruncatingSnafu, Result};
use crate::flush::{FlushReason, RegionFlushTask};
use crate::manifest::action::{RegionEdit, RegionMetaAction, RegionMetaActionList};
use crate::region::MitoRegionRef;
@@ -121,6 +121,7 @@ impl<S> RegionWorkerLoop<S> {
access_layer: region.access_layer.clone(),
memtable_builder: self.memtable_builder.clone(),
file_purger: region.file_purger.clone(),
listener: self.listener.clone(),
}
}
}
@@ -136,6 +137,15 @@ impl<S: LogStore> RegionWorkerLoop<S> {
return;
};
// The flush task before truncating the region fails immediately.
let version_data = region.version_control.current();
if let Some(truncated_entry_id) = version_data.version.truncated_entry_id {
if truncated_entry_id >= request.flushed_entry_id {
request.on_failure(RegionTruncatingSnafu { region_id }.build());
return;
}
}
// Write region edit to manifest.
let edit = RegionEdit {
files_to_add: std::mem::take(&mut request.file_metas),

View File

@@ -0,0 +1,66 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! Handling truncate related requests.
use common_query::Output;
use common_telemetry::info;
use store_api::logstore::LogStore;
use store_api::storage::RegionId;
use crate::error::Result;
use crate::manifest::action::{RegionMetaAction, RegionMetaActionList, RegionTruncate};
use crate::worker::RegionWorkerLoop;
impl<S: LogStore> RegionWorkerLoop<S> {
pub(crate) async fn handle_truncate_request(&mut self, region_id: RegionId) -> Result<Output> {
let region = self.regions.writable_region(region_id)?;
info!("Try to truncate region {}", region_id);
let version_data = region.version_control.current();
let truncated_entry_id = version_data.last_entry_id;
let truncated_sequence = version_data.committed_sequence;
// Write region truncated to manifest.
let truncate = RegionTruncate {
region_id,
truncated_entry_id,
truncated_sequence,
};
let action_list =
RegionMetaActionList::with_action(RegionMetaAction::Truncate(truncate.clone()));
region.manifest_manager.update(action_list).await?;
// Notifies flush scheduler.
self.flush_scheduler.on_region_truncating(region_id);
// TODO(DevilExileSu): Notifies compaction scheduler.
// Reset region's version and mark all SSTs deleted.
region.version_control.truncate(
truncated_entry_id,
truncated_sequence,
&self.memtable_builder,
);
// Make all data obsolete.
self.wal.obsolete(region_id, truncated_entry_id).await?;
info!(
"Complete truncating region: {}, entry id: {} and sequence: {}.",
region_id, truncated_entry_id, truncated_sequence
);
Ok(Output::AffectedRows(0))
}
}

View File

@@ -38,6 +38,7 @@ pub enum RegionRequest {
Alter(RegionAlterRequest),
Flush(RegionFlushRequest),
Compact(RegionCompactRequest),
Truncate(RegionTruncateRequest),
}
impl RegionRequest {
@@ -412,6 +413,9 @@ pub struct RegionFlushRequest {}
#[derive(Debug)]
pub struct RegionCompactRequest {}
#[derive(Debug)]
pub struct RegionTruncateRequest {}
#[cfg(test)]
mod tests {
use api::v1::region::RegionColumnDef;