mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-01-04 12:22:55 +00:00
feat: remove all the manifests in drop_region. (#1834)
* feat: drop_region delete manifest file * chore: remove redundant code * chore: fmt * chore: clippy * chore: clippy * feat: support delete_all in manifest. * chore:CR * test: test_drop_basic, test_drop_reopen * chore: cr * fix: typo * chore: cr
This commit is contained in:
@@ -583,28 +583,21 @@ impl<S: StorageEngine> MitoEngineInner<S> {
|
||||
// Remove the table from the engine to avoid further access from users.
|
||||
let _lock = self.table_mutex.lock(request.table_id).await;
|
||||
let removed_table = self.tables.remove(&request.table_id);
|
||||
|
||||
// Close the table to close all regions. Closing a region is idempotent.
|
||||
if let Some((_, table)) = &removed_table {
|
||||
let regions = table.region_ids();
|
||||
let table_id = table.table_info().ident.table_id;
|
||||
|
||||
table
|
||||
.drop_regions(®ions)
|
||||
.await
|
||||
.map_err(BoxedError::new)
|
||||
.context(table_error::TableOperationSnafu)?;
|
||||
let mut regions = table.remove_regions(&table.region_ids()).await?;
|
||||
|
||||
let ctx = StorageEngineContext::default();
|
||||
|
||||
let opts = CloseOptions::default();
|
||||
// Releases regions in storage engine
|
||||
for region_number in regions {
|
||||
self.storage_engine
|
||||
.close_region(&ctx, ®ion_name(table_id, region_number), &opts)
|
||||
.await
|
||||
.map_err(BoxedError::new)
|
||||
.context(table_error::TableOperationSnafu)?;
|
||||
}
|
||||
let _ = futures::future::try_join_all(
|
||||
regions
|
||||
.drain()
|
||||
.map(|(_, region)| self.storage_engine.drop_region(&ctx, region)),
|
||||
)
|
||||
.await
|
||||
.map_err(BoxedError::new)
|
||||
.context(table_error::TableOperationSnafu)?;
|
||||
|
||||
Ok(true)
|
||||
} else {
|
||||
|
||||
@@ -534,16 +534,6 @@ impl<R: Region> MitoTable<R> {
|
||||
Ok(removed)
|
||||
}
|
||||
|
||||
pub async fn drop_regions(&self, region_number: &[RegionNumber]) -> TableResult<()> {
|
||||
let regions = self.remove_regions(region_number).await?;
|
||||
|
||||
let _ = futures::future::try_join_all(regions.values().map(|region| region.drop_region()))
|
||||
.await
|
||||
.map_err(BoxedError::new)
|
||||
.context(table_error::TableOperationSnafu)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn is_releasable(&self) -> bool {
|
||||
let regions = self.regions.load();
|
||||
|
||||
|
||||
@@ -332,7 +332,7 @@ impl StorageEngine for MockEngine {
|
||||
}
|
||||
|
||||
async fn drop_region(&self, _ctx: &EngineContext, _region: Self::Region) -> Result<()> {
|
||||
unimplemented!()
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn get_region(&self, _ctx: &EngineContext, name: &str) -> Result<Option<MockRegion>> {
|
||||
|
||||
@@ -680,7 +680,14 @@ mod tests {
|
||||
.unwrap();
|
||||
|
||||
engine.drop_region(&ctx, region).await.unwrap();
|
||||
|
||||
assert!(engine.get_region(&ctx, region_name).unwrap().is_none());
|
||||
assert!(!engine
|
||||
.inner
|
||||
.object_store
|
||||
.is_exist(dir_path.join("manifest").to_str().unwrap())
|
||||
.await
|
||||
.unwrap());
|
||||
}
|
||||
|
||||
// Wait for gc
|
||||
|
||||
@@ -321,6 +321,49 @@ impl ManifestLogStorage for ManifestObjectStore {
|
||||
Ok(ret)
|
||||
}
|
||||
|
||||
async fn delete_all(&self, remove_action_manifest: ManifestVersion) -> Result<()> {
|
||||
let entries: Vec<Entry> = self.get_paths(Some).await?;
|
||||
|
||||
// Filter out the latest delta file.
|
||||
let paths: Vec<_> = entries
|
||||
.iter()
|
||||
.filter(|e| {
|
||||
let name = e.name();
|
||||
if is_delta_file(name) && file_version(name) == remove_action_manifest {
|
||||
return false;
|
||||
}
|
||||
true
|
||||
})
|
||||
.map(|e| e.path().to_string())
|
||||
.collect();
|
||||
|
||||
logging::info!(
|
||||
"Deleting {} from manifest storage path {} paths: {:?}",
|
||||
paths.len(),
|
||||
self.path,
|
||||
paths,
|
||||
);
|
||||
|
||||
// Delete all files except the latest delta file.
|
||||
self.object_store
|
||||
.remove(paths)
|
||||
.await
|
||||
.with_context(|_| DeleteObjectSnafu {
|
||||
path: self.path.clone(),
|
||||
})?;
|
||||
|
||||
// Delete the latest delta file and the manifest directory.
|
||||
self.object_store
|
||||
.remove_all(&self.path)
|
||||
.await
|
||||
.with_context(|_| DeleteObjectSnafu {
|
||||
path: self.path.clone(),
|
||||
})?;
|
||||
logging::info!("Deleted manifest storage path {}", self.path);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn save(&self, version: ManifestVersion, bytes: &[u8]) -> Result<()> {
|
||||
let path = self.delta_file_path(version);
|
||||
logging::debug!("Save log to manifest storage, version: {}", version);
|
||||
|
||||
@@ -28,7 +28,9 @@ use common_time::util;
|
||||
use metrics::{decrement_gauge, increment_gauge};
|
||||
use snafu::ResultExt;
|
||||
use store_api::logstore::LogStore;
|
||||
use store_api::manifest::{self, Manifest, ManifestVersion, MetaActionIterator};
|
||||
use store_api::manifest::{
|
||||
self, Manifest, ManifestLogStorage, ManifestVersion, MetaActionIterator,
|
||||
};
|
||||
use store_api::storage::{
|
||||
AlterRequest, CloseContext, FlushContext, FlushReason, OpenOptions, ReadContext, Region,
|
||||
RegionId, SequenceNumber, WriteContext, WriteResponse,
|
||||
@@ -127,6 +129,7 @@ impl<S: LogStore> Region for RegionImpl<S> {
|
||||
}
|
||||
|
||||
async fn drop_region(&self) -> Result<()> {
|
||||
decrement_gauge!(crate::metrics::REGION_COUNT, 1.0);
|
||||
self.inner.drop_region().await
|
||||
}
|
||||
|
||||
@@ -481,6 +484,22 @@ impl<S: LogStore> RegionImpl<S> {
|
||||
.insert(c.committed_sequence, (manifest_version, c.metadata));
|
||||
version = Some(v);
|
||||
}
|
||||
(RegionMetaAction::Remove(r), Some(v)) => {
|
||||
manifest.stop().await?;
|
||||
|
||||
let files = v.ssts().mark_all_files_deleted();
|
||||
logging::info!(
|
||||
"Try to remove all SSTs, region: {}, files: {:?}",
|
||||
r.region_id,
|
||||
files
|
||||
);
|
||||
|
||||
manifest
|
||||
.manifest_store()
|
||||
.delete_all(v.manifest_version())
|
||||
.await?;
|
||||
return Ok((None, recovered_metadata));
|
||||
}
|
||||
(action, None) => {
|
||||
actions.push((manifest_version, action));
|
||||
version = None;
|
||||
|
||||
@@ -60,12 +60,14 @@ mod alter;
|
||||
mod basic;
|
||||
mod close;
|
||||
mod compact;
|
||||
mod drop;
|
||||
mod flush;
|
||||
mod projection;
|
||||
|
||||
/// Create metadata of a region with schema: (timestamp, v0).
|
||||
pub fn new_metadata(region_name: &str) -> RegionMetadata {
|
||||
let desc = RegionDescBuilder::new(region_name)
|
||||
.id(123)
|
||||
.push_field_column(("v0", LogicalTypeId::String, true))
|
||||
.build();
|
||||
desc.try_into().unwrap()
|
||||
|
||||
@@ -38,7 +38,7 @@ struct CloseTester {
|
||||
base: Option<FileTesterBase>,
|
||||
}
|
||||
|
||||
/// Create a new region for flush test
|
||||
/// Create a new region for close test
|
||||
async fn create_region_for_close(
|
||||
store_dir: &str,
|
||||
flush_strategy: FlushStrategyRef,
|
||||
|
||||
192
src/storage/src/region/tests/drop.rs
Normal file
192
src/storage/src/region/tests/drop.rs
Normal file
@@ -0,0 +1,192 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
//! Region drop tests.
|
||||
|
||||
use std::path::Path;
|
||||
use std::sync::Arc;
|
||||
|
||||
use common_telemetry::info;
|
||||
use common_test_util::temp_dir::create_temp_dir;
|
||||
use log_store::raft_engine::log_store::RaftEngineLogStore;
|
||||
use store_api::manifest::{Manifest, MetaAction};
|
||||
use store_api::storage::{FlushContext, OpenOptions, Region};
|
||||
|
||||
use crate::config::EngineConfig;
|
||||
use crate::engine;
|
||||
use crate::flush::FlushStrategyRef;
|
||||
use crate::manifest::action::{RegionMetaAction, RegionMetaActionList, RegionRemove};
|
||||
use crate::region::tests::{self, FileTesterBase};
|
||||
use crate::region::RegionImpl;
|
||||
use crate::test_util::config_util;
|
||||
use crate::test_util::flush_switch::{has_parquet_file, FlushSwitch};
|
||||
|
||||
const REGION_NAME: &str = "region-drop-0";
|
||||
|
||||
/// Create a new region for drop tests.
|
||||
async fn create_region_for_drop(
|
||||
store_dir: &str,
|
||||
flush_strategy: FlushStrategyRef,
|
||||
) -> RegionImpl<RaftEngineLogStore> {
|
||||
let metadata = tests::new_metadata(REGION_NAME);
|
||||
|
||||
let mut store_config =
|
||||
config_util::new_store_config(REGION_NAME, store_dir, EngineConfig::default()).await;
|
||||
store_config.flush_strategy = flush_strategy;
|
||||
|
||||
RegionImpl::create(metadata, store_config).await.unwrap()
|
||||
}
|
||||
|
||||
/// Tester for drop tests.
|
||||
struct DropTester {
|
||||
base: Option<FileTesterBase>,
|
||||
}
|
||||
|
||||
impl DropTester {
|
||||
async fn new(store_dir: &str, flush_strategy: FlushStrategyRef) -> DropTester {
|
||||
let region = create_region_for_drop(store_dir, flush_strategy).await;
|
||||
DropTester {
|
||||
base: Some(FileTesterBase::with_region(region)),
|
||||
}
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn base(&self) -> &FileTesterBase {
|
||||
self.base.as_ref().unwrap()
|
||||
}
|
||||
|
||||
async fn put(&self, data: &[(i64, Option<i64>)]) {
|
||||
let data = data
|
||||
.iter()
|
||||
.map(|(ts, v0)| (*ts, v0.map(|v| v.to_string())))
|
||||
.collect::<Vec<_>>();
|
||||
let _ = self.base().put(&data).await;
|
||||
}
|
||||
|
||||
async fn flush(&self) {
|
||||
let ctx = FlushContext::default();
|
||||
self.base().region.flush(&ctx).await.unwrap();
|
||||
}
|
||||
|
||||
async fn close(&mut self) {
|
||||
if let Some(base) = self.base.take() {
|
||||
base.close().await;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn get_all_files(path: &str) -> Vec<String> {
|
||||
let mut files = Vec::new();
|
||||
for entry in std::fs::read_dir(path).unwrap() {
|
||||
let entry = entry.unwrap();
|
||||
let path = entry.path();
|
||||
if path.is_file() {
|
||||
files.push(path.to_str().unwrap().to_string());
|
||||
} else if path.is_dir() {
|
||||
files.extend(get_all_files(path.to_str().unwrap()));
|
||||
}
|
||||
}
|
||||
files
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_drop_basic() {
|
||||
let dir = create_temp_dir("drop-basic");
|
||||
common_telemetry::init_default_ut_logging();
|
||||
let store_dir = dir.path().to_str().unwrap();
|
||||
|
||||
let sst_dir = format!("{}/{}", store_dir, engine::region_sst_dir("", REGION_NAME));
|
||||
let manifest_dir = format!(
|
||||
"{}/{}",
|
||||
store_dir,
|
||||
engine::region_manifest_dir("", REGION_NAME)
|
||||
);
|
||||
let flush_switch = Arc::new(FlushSwitch::default());
|
||||
let mut tester = DropTester::new(store_dir, flush_switch.clone()).await;
|
||||
|
||||
let data = [(1000, Some(100))];
|
||||
|
||||
// Put one element so we have content to flush.
|
||||
tester.put(&data).await;
|
||||
|
||||
// Manually trigger flush.
|
||||
tester.flush().await;
|
||||
|
||||
assert!(has_parquet_file(&sst_dir));
|
||||
|
||||
tester.base().checkpoint_manifest().await;
|
||||
let manifest_files = get_all_files(&manifest_dir);
|
||||
info!("manifest_files: {:?}", manifest_files);
|
||||
|
||||
tester.base().region.drop_region().await.unwrap();
|
||||
tester.close().await;
|
||||
|
||||
assert!(!Path::new(&manifest_dir).exists());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_drop_reopen() {
|
||||
let dir = create_temp_dir("drop-basic");
|
||||
common_telemetry::init_default_ut_logging();
|
||||
let store_dir = dir.path().to_str().unwrap();
|
||||
|
||||
let sst_dir = format!("{}/{}", store_dir, engine::region_sst_dir("", REGION_NAME));
|
||||
let manifest_dir = format!(
|
||||
"{}/{}",
|
||||
store_dir,
|
||||
engine::region_manifest_dir("", REGION_NAME)
|
||||
);
|
||||
let flush_switch = Arc::new(FlushSwitch::default());
|
||||
let mut tester = DropTester::new(store_dir, flush_switch.clone()).await;
|
||||
|
||||
let data = [(1000, Some(100))];
|
||||
|
||||
// Put one element so we have content to flush.
|
||||
tester.put(&data).await;
|
||||
// Manually trigger flush.
|
||||
tester.flush().await;
|
||||
|
||||
assert!(has_parquet_file(&sst_dir));
|
||||
|
||||
tester.base().checkpoint_manifest().await;
|
||||
let version_control = tester.base().region.version_control();
|
||||
|
||||
let mut action_list =
|
||||
RegionMetaActionList::with_action(RegionMetaAction::Remove(RegionRemove {
|
||||
region_id: tester.base().region.id(),
|
||||
}));
|
||||
let prev_version = version_control.current_manifest_version();
|
||||
action_list.set_prev_version(prev_version);
|
||||
let manifest = &tester.base().region.inner.manifest;
|
||||
let _ = manifest.update(action_list).await.unwrap();
|
||||
tester.close().await;
|
||||
|
||||
// Reopen the region.
|
||||
let store_config = config_util::new_store_config(
|
||||
REGION_NAME,
|
||||
store_dir,
|
||||
EngineConfig {
|
||||
max_files_in_l0: usize::MAX,
|
||||
..Default::default()
|
||||
},
|
||||
)
|
||||
.await;
|
||||
|
||||
let opts = OpenOptions::default();
|
||||
let region = RegionImpl::open(REGION_NAME.to_string(), store_config, &opts)
|
||||
.await
|
||||
.unwrap();
|
||||
assert!(region.is_none());
|
||||
assert!(!Path::new(&manifest_dir).exists());
|
||||
}
|
||||
@@ -21,7 +21,7 @@ use futures::TryStreamExt;
|
||||
use metrics::increment_counter;
|
||||
use snafu::{ensure, ResultExt};
|
||||
use store_api::logstore::LogStore;
|
||||
use store_api::manifest::{Manifest, ManifestVersion, MetaAction};
|
||||
use store_api::manifest::{Manifest, ManifestLogStorage, ManifestVersion, MetaAction};
|
||||
use store_api::storage::{
|
||||
AlterRequest, FlushContext, FlushReason, SequenceNumber, WriteContext, WriteResponse,
|
||||
};
|
||||
@@ -289,6 +289,7 @@ impl RegionWriter {
|
||||
// 5. Mark all data obsolete in the WAL.
|
||||
// 6. Delete the namespace of the region from the WAL.
|
||||
// 7. Mark all SSTs deleted.
|
||||
// 8. Remove all manifests.
|
||||
let mut inner = self.inner.lock().await;
|
||||
inner.mark_closed();
|
||||
|
||||
@@ -317,7 +318,7 @@ impl RegionWriter {
|
||||
action_list
|
||||
);
|
||||
|
||||
let _ = drop_ctx.manifest.update(action_list).await?;
|
||||
let remove_action_version = drop_ctx.manifest.update(action_list).await?;
|
||||
|
||||
// Mark all data obsolete and delete the namespace in the WAL
|
||||
drop_ctx.wal.obsolete(committed_sequence).await?;
|
||||
@@ -336,6 +337,11 @@ impl RegionWriter {
|
||||
files
|
||||
);
|
||||
|
||||
drop_ctx
|
||||
.manifest
|
||||
.manifest_store()
|
||||
.delete_all(remove_action_version)
|
||||
.await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
||||
@@ -46,6 +46,10 @@ pub trait ManifestLogStorage {
|
||||
keep_last_checkpoint: bool,
|
||||
) -> Result<usize, Self::Error>;
|
||||
|
||||
/// Delete all logs and checkpoints, and remove the manifest directory.
|
||||
/// The delta file corresponding to the `remove_action_version` will be deleted along with the manifest directory at the end.
|
||||
async fn delete_all(&self, remove_action_version: ManifestVersion) -> Result<(), Self::Error>;
|
||||
|
||||
/// Save a log
|
||||
async fn save(&self, version: ManifestVersion, bytes: &[u8]) -> Result<(), Self::Error>;
|
||||
|
||||
|
||||
Reference in New Issue
Block a user