feat: add delete WAL in drop_region (#1577)

* feat: add delete WAL in drop_region

* chore: fix typo err.

* feat: mark all SSTs deleted and remove the region from StorageEngine's region map.

* test: add test_drop_region for StorageEngine.

* chore: make clippy happy

* fix: fix conflict

* chore: CR.

* chore: CR

* chore: fix clippy

* fix: temp file life time
This commit is contained in:
Vanish
2023-05-18 18:02:34 +08:00
committed by GitHub
parent d76ddc575f
commit 8764ce7845
16 changed files with 257 additions and 26 deletions

View File

@@ -80,11 +80,11 @@ impl LogStore for NoopLogStore {
)))))
}
async fn create_namespace(&mut self, _ns: &Self::Namespace) -> Result<()> {
async fn create_namespace(&self, _ns: &Self::Namespace) -> Result<()> {
Ok(())
}
async fn delete_namespace(&mut self, _ns: &Self::Namespace) -> Result<()> {
async fn delete_namespace(&self, _ns: &Self::Namespace) -> Result<()> {
Ok(())
}
@@ -128,7 +128,7 @@ mod tests {
#[tokio::test]
async fn test_noop_logstore() {
let mut store = NoopLogStore::default();
let store = NoopLogStore::default();
let e = store.entry("".as_bytes(), 1, NamespaceImpl::default());
store.append(e.clone()).await.unwrap();
store

View File

@@ -220,7 +220,7 @@ impl LogStore for RaftEngineLogStore {
Ok(Box::pin(s))
}
async fn create_namespace(&mut self, ns: &Self::Namespace) -> Result<(), Self::Error> {
async fn create_namespace(&self, ns: &Self::Namespace) -> Result<(), Self::Error> {
ensure!(
ns.id != SYSTEM_NAMESPACE,
IllegalNamespaceSnafu { ns: ns.id }
@@ -237,7 +237,7 @@ impl LogStore for RaftEngineLogStore {
Ok(())
}
async fn delete_namespace(&mut self, ns: &Self::Namespace) -> Result<(), Self::Error> {
async fn delete_namespace(&self, ns: &Self::Namespace) -> Result<(), Self::Error> {
ensure!(
ns.id != SYSTEM_NAMESPACE,
IllegalNamespaceSnafu { ns: ns.id }
@@ -343,7 +343,7 @@ mod tests {
#[tokio::test]
async fn test_manage_namespace() {
let dir = create_temp_dir("raft-engine-logstore-test");
let mut logstore = RaftEngineLogStore::try_new(LogConfig {
let logstore = RaftEngineLogStore::try_new(LogConfig {
log_file_dir: dir.path().to_str().unwrap().to_string(),
..Default::default()
})

View File

@@ -614,7 +614,7 @@ impl<S: StorageEngine> MitoEngineInner<S> {
// Close the table to close all regions. Closing a region is idempotent.
if let Some((_, table)) = &removed_table {
table
.close()
.drop_regions()
.await
.map_err(BoxedError::new)
.context(table_error::TableOperationSnafu)?;

View File

@@ -330,6 +330,14 @@ impl<R: Region> Table for MitoTable<R> {
Ok(())
}
async fn drop_regions(&self) -> TableResult<()> {
futures::future::try_join_all(self.regions.values().map(|region| region.drop_region()))
.await
.map_err(BoxedError::new)
.context(table_error::TableOperationSnafu)?;
Ok(())
}
fn region_stats(&self) -> TableResult<Vec<RegionStat>> {
Ok(self
.regions

View File

@@ -197,6 +197,10 @@ impl Region for MockRegion {
Ok(())
}
async fn drop_region(&self) -> Result<()> {
Ok(())
}
fn disk_usage_bytes(&self) -> u64 {
0
}

View File

@@ -81,8 +81,10 @@ impl<S: LogStore> StorageEngine for EngineImpl<S> {
self.inner.create_region(descriptor, opts).await
}
async fn drop_region(&self, _ctx: &EngineContext, _region: Self::Region) -> Result<()> {
unimplemented!()
async fn drop_region(&self, _ctx: &EngineContext, region: Self::Region) -> Result<()> {
region.drop_region().await?;
self.inner.remove_reigon(region.name());
Ok(())
}
fn get_region(&self, _ctx: &EngineContext, name: &str) -> Result<Option<Self::Region>> {
@@ -433,6 +435,10 @@ impl<S: LogStore> EngineInner<S> {
self.regions.get_region(name)
}
fn remove_reigon(&self, name: &str) {
self.regions.remove(name)
}
async fn region_store_config(
&self,
parent_dir: &str,
@@ -494,23 +500,35 @@ impl<S: LogStore> EngineInner<S> {
#[cfg(test)]
mod tests {
use common_test_util::temp_dir::create_temp_dir;
use std::ffi::OsStr;
use std::path::Path;
use common_test_util::temp_dir::{create_temp_dir, TempDir};
use datatypes::type_id::LogicalTypeId;
use datatypes::vectors::{Float32Vector, Int32Vector, TimestampMillisecondVector, VectorRef};
use log_store::raft_engine::log_store::RaftEngineLogStore;
use log_store::test_util::log_store_util;
use object_store::services::Fs;
use store_api::storage::Region;
use store_api::storage::{FlushContext, Region, WriteContext, WriteRequest};
use super::*;
use crate::compaction::noop::NoopCompactionScheduler;
use crate::test_util::descriptor_util::RegionDescBuilder;
#[tokio::test]
async fn test_create_new_region() {
let log_file_dir = create_temp_dir("test_engine_wal");
type TestEngine = EngineImpl<RaftEngineLogStore>;
type TestRegion = RegionImpl<RaftEngineLogStore>;
async fn create_engine_and_region(
tmp_dir: &TempDir,
log_file_dir: &TempDir,
region_name: &str,
region_id: u64,
ctx: &EngineContext,
) -> (TestEngine, TestRegion) {
let log_file_dir_path = log_file_dir.path().to_str().unwrap();
let log_store = log_store_util::create_tmp_local_file_log_store(log_file_dir_path).await;
let dir = create_temp_dir("test_create_new_region");
let store_dir = dir.path().to_string_lossy();
let store_dir = tmp_dir.path().to_string_lossy();
let mut builder = Fs::default();
builder.root(&store_dir);
@@ -528,17 +546,40 @@ mod tests {
)
.unwrap();
let region_name = "region-0";
let desc = RegionDescBuilder::new(region_name)
.id(region_id)
.push_key_column(("k1", LogicalTypeId::Int32, false))
.push_field_column(("v1", LogicalTypeId::Float32, true))
.timestamp(("ts", LogicalTypeId::TimestampMillisecond, false))
.build();
let ctx = EngineContext::default();
let region = engine
.create_region(&ctx, desc, &CreateOptions::default())
.create_region(ctx, desc, &CreateOptions::default())
.await
.unwrap();
(engine, region)
}
fn parquet_file_num(path: &Path) -> usize {
path.read_dir()
.unwrap()
.filter_map(|entry| entry.ok())
.filter(|entry| entry.path().extension() == Some(OsStr::new("parquet")))
.count()
}
#[tokio::test]
async fn test_create_new_region() {
let dir = create_temp_dir("test_create_region");
let log_file_dir = create_temp_dir("test_engine_wal");
let region_name = "region-0";
let region_id = 123456;
let ctx = EngineContext::default();
let (engine, region) =
create_engine_and_region(&dir, &log_file_dir, region_name, region_id, &ctx).await;
assert_eq!(region_name, region.name());
let region2 = engine.get_region(&ctx, region_name).unwrap().unwrap();
@@ -546,4 +587,56 @@ mod tests {
assert!(engine.get_region(&ctx, "no such region").unwrap().is_none());
}
#[tokio::test]
async fn test_drop_region() {
common_telemetry::init_default_ut_logging();
let dir = create_temp_dir("test_drop_region");
let log_file_dir = create_temp_dir("test_engine_wal");
let region_name = "test_region";
let region_id = 123456;
let ctx = EngineContext::default();
let (engine, region) =
create_engine_and_region(&dir, &log_file_dir, region_name, region_id, &ctx).await;
assert_eq!(region_name, region.name());
let mut wb = region.write_request();
let k1 = Arc::new(Int32Vector::from_slice([1, 2, 3])) as VectorRef;
let v1 = Arc::new(Float32Vector::from_slice([0.1, 0.2, 0.3])) as VectorRef;
let tsv = Arc::new(TimestampMillisecondVector::from_slice([0, 0, 0])) as VectorRef;
let mut put_data = HashMap::with_capacity(4);
put_data.insert("k1".to_string(), k1);
put_data.insert("v1".to_string(), v1);
put_data.insert("ts".to_string(), tsv);
wb.put(put_data).unwrap();
region.write(&WriteContext::default(), wb).await.unwrap();
// Flush memtable to sst.
region.flush(&FlushContext::default()).await.unwrap();
engine.close_region(&ctx, region).await.unwrap();
let dir_path = dir.path().join(region_name);
assert_eq!(1, parquet_file_num(&dir_path));
{
let region = engine
.open_region(&ctx, region_name, &OpenOptions::default())
.await
.unwrap()
.unwrap();
engine.drop_region(&ctx, region).await.unwrap();
assert!(engine.get_region(&ctx, region_name).unwrap().is_none());
}
// Wait for gc
tokio::time::sleep(Duration::from_millis(60)).await;
assert_eq!(0, parquet_file_num(&dir_path));
}
}

View File

@@ -240,6 +240,17 @@ pub enum Error {
location: Location,
},
#[snafu(display(
"Failed to delete WAL namespace, region id: {}, source: {}",
region_id,
source
))]
DeleteWalNamespace {
region_id: RegionId,
location: Location,
source: BoxedError,
},
#[snafu(display(
"Sequence of region should increase monotonically (should be {} < {})",
prev,
@@ -600,6 +611,7 @@ impl ErrorExt for Error {
CreateDefault { source, .. } => source.status_code(),
ConvertChunk { source, .. } => source.status_code(),
MarkWalObsolete { source, .. } => source.status_code(),
DeleteWalNamespace { source, .. } => source.status_code(),
DecodeParquetTimeRange { .. } => StatusCode::Unexpected,
RateLimited { .. } | StopScheduler { .. } | CompactTaskCancel { .. } => {
StatusCode::Internal

View File

@@ -388,7 +388,7 @@ impl<S: Checkpoint<Error = Error>, M: MetaAction<Error = Error>> ManifestImplInn
// It happens when saving checkpoint successfully, but failed at saving checkpoint metadata(the "__last_checkpoint" file).
// Then we try to use the old checkpoint and do the checkpoint next time.
// If the old checkpoint was deleted, it's fine that we return the latest checkpoint.
// the only side effect is leaving some unused checkpoint checkpoint files,
// The only side effect is leaving some unused checkpoint files,
// and they will be purged by gc task.
warn!("The checkpoint manifest version {} in {} is greater than checkpoint metadata version {}.", self.store.path(), checkpoint.last_version(), version);

View File

@@ -46,6 +46,7 @@ use crate::manifest::region::RegionManifest;
use crate::memtable::MemtableBuilderRef;
use crate::metadata::{RegionMetaImpl, RegionMetadata, RegionMetadataRef};
pub(crate) use crate::region::writer::schedule_compaction;
use crate::region::writer::DropContext;
pub use crate::region::writer::{AlterContext, RegionWriter, RegionWriterRef, WriterContext};
use crate::schema::compat::CompatWrite;
use crate::snapshot::SnapshotImpl;
@@ -130,6 +131,10 @@ impl<S: LogStore> Region for RegionImpl<S> {
self.inner.close().await
}
async fn drop_region(&self) -> Result<()> {
self.inner.drop_region().await
}
fn disk_usage_bytes(&self) -> u64 {
let version = self.inner.version_control().current();
version
@@ -675,6 +680,21 @@ impl<S: LogStore> RegionInner<S> {
self.manifest.stop().await
}
async fn drop_region(&self) -> Result<()> {
logging::info!("Drop region {}, name: {}", self.shared.id, self.shared.name);
let drop_ctx = DropContext {
shared: &self.shared,
wal: &self.wal,
manifest: &self.manifest,
flush_scheduler: &self.flush_scheduler,
compaction_scheduler: &self.compaction_scheduler,
sst_layer: &self.sst_layer,
};
self.manifest.stop().await?;
self.writer.on_drop(drop_ctx).await
}
async fn flush(&self, ctx: &FlushContext) -> Result<()> {
let writer_ctx = WriterContext {
shared: &self.shared,

View File

@@ -33,6 +33,7 @@ use crate::error::{self, Result};
use crate::flush::{FlushHandle, FlushRequest, FlushSchedulerRef, FlushStrategyRef};
use crate::manifest::action::{
RawRegionMetadata, RegionChange, RegionEdit, RegionMetaAction, RegionMetaActionList,
RegionRemove,
};
use crate::memtable::{Inserter, MemtableBuilderRef, MemtableId, MemtableRef};
use crate::metadata::RegionMetadataRef;
@@ -270,7 +271,58 @@ impl RegionWriter {
self.wait_flush().await?;
// TODO: canncel the compaction task
// TODO: cancel the compaction task
Ok(())
}
pub async fn on_drop<S: LogStore>(&self, drop_ctx: DropContext<'_, S>) -> Result<()> {
// 1. Acquires the write lock.
// 2. Close writer reject any potential writing.
// 3. Waits or cancels the flush job.
// 4. Add `RegionMetaAction::Remove` to recover from manifest in case of failure.
// The main task is to restore the cleaning of sst files. If there is a failure
// in the previous stops, it can be restored through the `Procedure` framework.
// 5. Mark all data obsolete in the WAL.
// 6. Delete the namespace of the region from the WAL.
// 7. Mark all SSTs deleted.
let mut inner = self.inner.lock().await;
inner.mark_closed();
if let Some(handle) = inner.flush_handle.take() {
handle.wait().await?;
}
let version_control = drop_ctx.version_control();
let _lock = self.version_mutex.lock().await;
let committed_sequence = version_control.committed_sequence();
let current_version = version_control.current();
let mut action_list =
RegionMetaActionList::with_action(RegionMetaAction::Remove(RegionRemove {
region_id: drop_ctx.shared.id,
}));
// Persist the meta action.
let prev_version = version_control.current_manifest_version();
action_list.set_prev_version(prev_version);
logging::debug!(
"Try to remove region {}, action_list: {:?}",
drop_ctx.shared.name(),
action_list
);
drop_ctx.manifest.update(action_list).await?;
// Mark all data obsolete and delete the namespace in the WAL
drop_ctx.wal.obsolete(committed_sequence).await?;
drop_ctx.wal.delete_namespace().await?;
// Mark all SSTs deleted
let files = current_version.ssts().mark_all_files_deleted();
logging::debug!("Try to remove all SSTs {:?}", files);
Ok(())
}
@@ -354,6 +406,22 @@ impl<'a, S: LogStore> AlterContext<'a, S> {
}
}
pub struct DropContext<'a, S: LogStore> {
pub shared: &'a SharedDataRef,
pub wal: &'a Wal<S>,
pub manifest: &'a RegionManifest,
pub flush_scheduler: &'a FlushSchedulerRef<S>,
pub compaction_scheduler: &'a CompactionSchedulerRef<S>,
pub sst_layer: &'a AccessLayerRef,
}
impl<'a, S: LogStore> DropContext<'a, S> {
#[inline]
fn version_control(&self) -> &VersionControlRef {
&self.shared.version_control
}
}
#[derive(Debug)]
struct WriterInner {
memtable_builder: MemtableBuilderRef,

View File

@@ -121,6 +121,16 @@ impl LevelMetas {
merged
}
pub fn mark_all_files_deleted(&self) -> Vec<FileId> {
self.levels().iter().fold(vec![], |mut files, level| {
files.extend(level.files().map(|f| {
f.mark_deleted();
f.file_id()
}));
files
})
}
pub fn levels(&self) -> &[LevelMeta] {
&self.levels
}

View File

@@ -26,8 +26,8 @@ use store_api::storage::{RegionId, SequenceNumber};
use crate::codec::{Decoder, Encoder};
use crate::error::{
DecodeWalHeaderSnafu, EncodeWalHeaderSnafu, Error, MarkWalObsoleteSnafu, ReadWalSnafu, Result,
WalDataCorruptedSnafu, WriteWalSnafu,
DecodeWalHeaderSnafu, DeleteWalNamespaceSnafu, EncodeWalHeaderSnafu, Error,
MarkWalObsoleteSnafu, ReadWalSnafu, Result, WalDataCorruptedSnafu, WriteWalSnafu,
};
use crate::proto::wal::{self, WalHeader};
use crate::write_batch::codec::{PayloadDecoder, PayloadEncoder};
@@ -74,6 +74,16 @@ impl<S: LogStore> Wal<S> {
})
}
pub async fn delete_namespace(&self) -> Result<()> {
self.store
.delete_namespace(&self.namespace)
.await
.map_err(BoxedError::new)
.context(DeleteWalNamespaceSnafu {
region_id: self.region_id,
})
}
#[inline]
pub fn region_id(&self) -> RegionId {
self.region_id

View File

@@ -54,10 +54,10 @@ pub trait LogStore: Send + Sync + 'static + std::fmt::Debug {
) -> Result<SendableEntryStream<Self::Entry, Self::Error>, Self::Error>;
/// Create a new `Namespace`.
async fn create_namespace(&mut self, ns: &Self::Namespace) -> Result<(), Self::Error>;
async fn create_namespace(&self, ns: &Self::Namespace) -> Result<(), Self::Error>;
/// Delete an existing `Namespace` with given ref.
async fn delete_namespace(&mut self, ns: &Self::Namespace) -> Result<(), Self::Error>;
async fn delete_namespace(&self, ns: &Self::Namespace) -> Result<(), Self::Error>;
/// List all existing namespaces.
async fn list_namespaces(&self) -> Result<Vec<Self::Namespace>, Self::Error>;

View File

@@ -75,6 +75,8 @@ pub trait Region: Send + Sync + Clone + std::fmt::Debug + 'static {
async fn close(&self) -> Result<(), Self::Error>;
async fn drop_region(&self) -> Result<(), Self::Error>;
fn disk_usage_bytes(&self) -> u64;
fn region_stat(&self) -> RegionStat {

View File

@@ -112,6 +112,11 @@ pub trait Table: Send + Sync {
Ok(())
}
/// Drop regions
async fn drop_regions(&self) -> Result<()> {
Ok(())
}
/// Get region stats in this table.
fn region_stats(&self) -> Result<Vec<RegionStat>> {
UnsupportedSnafu {

View File

@@ -1 +0,0 @@
../standalone/common