From eddff1752315aa42188a4e9fc0f808cf5394343d Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Fri, 1 Sep 2023 05:17:32 -0500 Subject: [PATCH] feat: drop region in mito2 (#2286) * basic impl Signed-off-by: Ruihang Xia * check in opening region Signed-off-by: Ruihang Xia * fix typo Signed-off-by: Ruihang Xia * add test Signed-off-by: Ruihang Xia * fix clippy Signed-off-by: Ruihang Xia * fix typo again Signed-off-by: Ruihang Xia * Update src/mito2/src/worker/handle_drop.rs Co-authored-by: JeremyHi * remove file in order Signed-off-by: Ruihang Xia * fix remove logic Signed-off-by: Ruihang Xia * use scan to list files Signed-off-by: Ruihang Xia --------- Signed-off-by: Ruihang Xia Co-authored-by: JeremyHi --- src/mito2/src/engine.rs | 8 ++ src/mito2/src/engine/close_test.rs | 51 ++++++++ src/mito2/src/engine/create_test.rs | 77 ++++++++++++ src/mito2/src/engine/drop_test.rs | 71 +++++++++++ src/mito2/src/engine/open_test.rs | 104 ++++++++++++++++ src/mito2/src/engine/tests.rs | 174 +-------------------------- src/mito2/src/region/version.rs | 10 ++ src/mito2/src/region_write_ctx.rs | 1 + src/mito2/src/sst/file.rs | 1 - src/mito2/src/sst/version.rs | 9 ++ src/mito2/src/worker.rs | 12 +- src/mito2/src/worker/handle_drop.rs | 147 ++++++++++++++++++++++ src/mito2/src/worker/handle_flush.rs | 1 + src/mito2/src/worker/handle_open.rs | 20 ++- 14 files changed, 506 insertions(+), 180 deletions(-) create mode 100644 src/mito2/src/engine/close_test.rs create mode 100644 src/mito2/src/engine/create_test.rs create mode 100644 src/mito2/src/engine/drop_test.rs create mode 100644 src/mito2/src/engine/open_test.rs create mode 100644 src/mito2/src/worker/handle_drop.rs diff --git a/src/mito2/src/engine.rs b/src/mito2/src/engine.rs index 503433e36f..22760fa194 100644 --- a/src/mito2/src/engine.rs +++ b/src/mito2/src/engine.rs @@ -14,6 +14,14 @@ //! Mito region engine. +#[cfg(test)] +mod close_test; +#[cfg(test)] +mod create_test; +#[cfg(test)] +mod drop_test; +#[cfg(test)] +mod open_test; #[cfg(test)] mod tests; diff --git a/src/mito2/src/engine/close_test.rs b/src/mito2/src/engine/close_test.rs new file mode 100644 index 0000000000..5ab27300a3 --- /dev/null +++ b/src/mito2/src/engine/close_test.rs @@ -0,0 +1,51 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use store_api::region_request::{RegionCloseRequest, RegionRequest}; +use store_api::storage::RegionId; + +use crate::config::MitoConfig; +use crate::test_util::{CreateRequestBuilder, TestEnv}; + +#[tokio::test] +async fn test_engine_close_region() { + let mut env = TestEnv::with_prefix("close"); + let engine = env.create_engine(MitoConfig::default()).await; + + let region_id = RegionId::new(1, 1); + // It's okay to close a region doesn't exist. + engine + .handle_request(region_id, RegionRequest::Close(RegionCloseRequest {})) + .await + .unwrap(); + + let request = CreateRequestBuilder::new().build(); + engine + .handle_request(region_id, RegionRequest::Create(request)) + .await + .unwrap(); + + // Close the created region. + engine + .handle_request(region_id, RegionRequest::Close(RegionCloseRequest {})) + .await + .unwrap(); + assert!(!engine.is_region_exists(region_id)); + + // It's okay to close this region again. + engine + .handle_request(region_id, RegionRequest::Close(RegionCloseRequest {})) + .await + .unwrap(); +} diff --git a/src/mito2/src/engine/create_test.rs b/src/mito2/src/engine/create_test.rs new file mode 100644 index 0000000000..f4ad9fabad --- /dev/null +++ b/src/mito2/src/engine/create_test.rs @@ -0,0 +1,77 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use store_api::region_request::RegionRequest; +use store_api::storage::RegionId; + +use crate::config::MitoConfig; +use crate::error::Error; +use crate::test_util::{CreateRequestBuilder, TestEnv}; + +#[tokio::test] +async fn test_engine_create_new_region() { + let mut env = TestEnv::with_prefix("new-region"); + let engine = env.create_engine(MitoConfig::default()).await; + + let region_id = RegionId::new(1, 1); + let request = CreateRequestBuilder::new().build(); + engine + .handle_request(region_id, RegionRequest::Create(request)) + .await + .unwrap(); + + assert!(engine.is_region_exists(region_id)); +} + +#[tokio::test] +async fn test_engine_create_region_if_not_exists() { + let mut env = TestEnv::with_prefix("create-not-exists"); + let engine = env.create_engine(MitoConfig::default()).await; + + let region_id = RegionId::new(1, 1); + let builder = CreateRequestBuilder::new().create_if_not_exists(true); + engine + .handle_request(region_id, RegionRequest::Create(builder.build())) + .await + .unwrap(); + + // Create the same region again. + engine + .handle_request(region_id, RegionRequest::Create(builder.build())) + .await + .unwrap(); +} + +#[tokio::test] +async fn test_engine_create_existing_region() { + let mut env = TestEnv::with_prefix("create-existing"); + let engine = env.create_engine(MitoConfig::default()).await; + + let region_id = RegionId::new(1, 1); + let builder = CreateRequestBuilder::new(); + engine + .handle_request(region_id, RegionRequest::Create(builder.build())) + .await + .unwrap(); + + // Create the same region again. + let err = engine + .handle_request(region_id, RegionRequest::Create(builder.build())) + .await + .unwrap_err(); + assert!( + matches!(err, Error::RegionExists { .. }), + "unexpected err: {err}" + ); +} diff --git a/src/mito2/src/engine/drop_test.rs b/src/mito2/src/engine/drop_test.rs new file mode 100644 index 0000000000..1f6f372770 --- /dev/null +++ b/src/mito2/src/engine/drop_test.rs @@ -0,0 +1,71 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use object_store::util::join_path; +use store_api::region_request::{RegionDropRequest, RegionRequest}; +use store_api::storage::RegionId; + +use crate::config::MitoConfig; +use crate::test_util::{CreateRequestBuilder, TestEnv}; +use crate::worker::DROPPING_MARKER_FILE; + +#[tokio::test] +async fn test_engine_drop_region() { + let mut env = TestEnv::with_prefix("drop"); + let engine = env.create_engine(MitoConfig::default()).await; + + let region_id = RegionId::new(1, 1); + // It's okay to drop a region doesn't exist. + engine + .handle_request(region_id, RegionRequest::Drop(RegionDropRequest {})) + .await + .unwrap_err(); + + let request = CreateRequestBuilder::new().build(); + engine + .handle_request(region_id, RegionRequest::Create(request)) + .await + .unwrap(); + + let region = engine.get_region(region_id).unwrap(); + let region_dir = region.access_layer.region_dir().to_owned(); + // no dropping marker file + assert!(!env + .get_object_store() + .unwrap() + .is_exist(&join_path(®ion_dir, DROPPING_MARKER_FILE)) + .await + .unwrap()); + + // create a parquet file + env.get_object_store() + .unwrap() + .write(&join_path(®ion_dir, "blabla.parquet"), vec![]) + .await + .unwrap(); + + // drop the created region. + engine + .handle_request(region_id, RegionRequest::Drop(RegionDropRequest {})) + .await + .unwrap(); + assert!(!engine.is_region_exists(region_id)); + // the drop marker is not removed yet + assert!(env + .get_object_store() + .unwrap() + .is_exist(&join_path(®ion_dir, DROPPING_MARKER_FILE)) + .await + .unwrap()); +} diff --git a/src/mito2/src/engine/open_test.rs b/src/mito2/src/engine/open_test.rs new file mode 100644 index 0000000000..e06ea9eca4 --- /dev/null +++ b/src/mito2/src/engine/open_test.rs @@ -0,0 +1,104 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::HashMap; + +use store_api::region_request::{RegionCloseRequest, RegionOpenRequest, RegionRequest}; +use store_api::storage::RegionId; + +use crate::config::MitoConfig; +use crate::error::Error; +use crate::test_util::{CreateRequestBuilder, TestEnv}; + +#[tokio::test] +async fn test_engine_open_empty() { + let mut env = TestEnv::with_prefix("open-empty"); + let engine = env.create_engine(MitoConfig::default()).await; + + let err = engine + .handle_request( + RegionId::new(1, 1), + RegionRequest::Open(RegionOpenRequest { + engine: String::new(), + region_dir: "empty".to_string(), + options: HashMap::default(), + }), + ) + .await + .unwrap_err(); + assert!( + matches!(err, Error::RegionNotFound { .. }), + "unexpected err: {err}" + ); +} + +#[tokio::test] +async fn test_engine_open_existing() { + let mut env = TestEnv::with_prefix("open-exiting"); + let engine = env.create_engine(MitoConfig::default()).await; + + let region_id = RegionId::new(1, 1); + let request = CreateRequestBuilder::new().build(); + let region_dir = request.region_dir.clone(); + engine + .handle_request(region_id, RegionRequest::Create(request)) + .await + .unwrap(); + + engine + .handle_request( + region_id, + RegionRequest::Open(RegionOpenRequest { + engine: String::new(), + region_dir, + options: HashMap::default(), + }), + ) + .await + .unwrap(); +} + +#[tokio::test] +async fn test_engine_reopen_region() { + let mut env = TestEnv::with_prefix("reopen-region"); + let engine = env.create_engine(MitoConfig::default()).await; + + let region_id = RegionId::new(1, 1); + let request = CreateRequestBuilder::new().build(); + let region_dir = request.region_dir.clone(); + engine + .handle_request(region_id, RegionRequest::Create(request)) + .await + .unwrap(); + + // Close the region. + engine + .handle_request(region_id, RegionRequest::Close(RegionCloseRequest {})) + .await + .unwrap(); + + // Open the region again. + engine + .handle_request( + region_id, + RegionRequest::Open(RegionOpenRequest { + engine: String::new(), + region_dir, + options: HashMap::default(), + }), + ) + .await + .unwrap(); + assert!(engine.is_region_exists(region_id)); +} diff --git a/src/mito2/src/engine/tests.rs b/src/mito2/src/engine/tests.rs index 3211cc88a1..67a4e1c6cb 100644 --- a/src/mito2/src/engine/tests.rs +++ b/src/mito2/src/engine/tests.rs @@ -22,8 +22,7 @@ use api::v1::{ColumnSchema, Row, Rows, SemanticType}; use common_recordbatch::RecordBatches; use store_api::metadata::ColumnMetadata; use store_api::region_request::{ - RegionCloseRequest, RegionCreateRequest, RegionDeleteRequest, RegionOpenRequest, - RegionPutRequest, + RegionCreateRequest, RegionDeleteRequest, RegionOpenRequest, RegionPutRequest, }; use store_api::storage::RegionId; @@ -59,177 +58,6 @@ async fn test_engine_new_stop() { ); } -#[tokio::test] -async fn test_engine_create_new_region() { - let mut env = TestEnv::with_prefix("new-region"); - let engine = env.create_engine(MitoConfig::default()).await; - - let region_id = RegionId::new(1, 1); - let request = CreateRequestBuilder::new().build(); - engine - .handle_request(region_id, RegionRequest::Create(request)) - .await - .unwrap(); - - assert!(engine.is_region_exists(region_id)); -} - -#[tokio::test] -async fn test_engine_create_region_if_not_exists() { - let mut env = TestEnv::with_prefix("create-not-exists"); - let engine = env.create_engine(MitoConfig::default()).await; - - let region_id = RegionId::new(1, 1); - let builder = CreateRequestBuilder::new().create_if_not_exists(true); - engine - .handle_request(region_id, RegionRequest::Create(builder.build())) - .await - .unwrap(); - - // Create the same region again. - engine - .handle_request(region_id, RegionRequest::Create(builder.build())) - .await - .unwrap(); -} - -#[tokio::test] -async fn test_engine_create_existing_region() { - let mut env = TestEnv::with_prefix("create-existing"); - let engine = env.create_engine(MitoConfig::default()).await; - - let region_id = RegionId::new(1, 1); - let builder = CreateRequestBuilder::new(); - engine - .handle_request(region_id, RegionRequest::Create(builder.build())) - .await - .unwrap(); - - // Create the same region again. - let err = engine - .handle_request(region_id, RegionRequest::Create(builder.build())) - .await - .unwrap_err(); - assert!( - matches!(err, Error::RegionExists { .. }), - "unexpected err: {err}" - ); -} - -#[tokio::test] -async fn test_engine_open_empty() { - let mut env = TestEnv::with_prefix("open-empty"); - let engine = env.create_engine(MitoConfig::default()).await; - - let err = engine - .handle_request( - RegionId::new(1, 1), - RegionRequest::Open(RegionOpenRequest { - engine: String::new(), - region_dir: "empty".to_string(), - options: HashMap::default(), - }), - ) - .await - .unwrap_err(); - assert!( - matches!(err, Error::RegionNotFound { .. }), - "unexpected err: {err}" - ); -} - -#[tokio::test] -async fn test_engine_open_existing() { - let mut env = TestEnv::with_prefix("open-exiting"); - let engine = env.create_engine(MitoConfig::default()).await; - - let region_id = RegionId::new(1, 1); - let request = CreateRequestBuilder::new().build(); - let region_dir = request.region_dir.clone(); - engine - .handle_request(region_id, RegionRequest::Create(request)) - .await - .unwrap(); - - engine - .handle_request( - region_id, - RegionRequest::Open(RegionOpenRequest { - engine: String::new(), - region_dir, - options: HashMap::default(), - }), - ) - .await - .unwrap(); -} - -#[tokio::test] -async fn test_engine_close_region() { - let mut env = TestEnv::with_prefix("close"); - let engine = env.create_engine(MitoConfig::default()).await; - - let region_id = RegionId::new(1, 1); - // It's okay to close a region doesn't exist. - engine - .handle_request(region_id, RegionRequest::Close(RegionCloseRequest {})) - .await - .unwrap(); - - let request = CreateRequestBuilder::new().build(); - engine - .handle_request(region_id, RegionRequest::Create(request)) - .await - .unwrap(); - - // Close the created region. - engine - .handle_request(region_id, RegionRequest::Close(RegionCloseRequest {})) - .await - .unwrap(); - assert!(!engine.is_region_exists(region_id)); - - // It's okay to close this region again. - engine - .handle_request(region_id, RegionRequest::Close(RegionCloseRequest {})) - .await - .unwrap(); -} - -#[tokio::test] -async fn test_engine_reopen_region() { - let mut env = TestEnv::with_prefix("reopen-region"); - let engine = env.create_engine(MitoConfig::default()).await; - - let region_id = RegionId::new(1, 1); - let request = CreateRequestBuilder::new().build(); - let region_dir = request.region_dir.clone(); - engine - .handle_request(region_id, RegionRequest::Create(request)) - .await - .unwrap(); - - // Close the region. - engine - .handle_request(region_id, RegionRequest::Close(RegionCloseRequest {})) - .await - .unwrap(); - - // Open the region again. - engine - .handle_request( - region_id, - RegionRequest::Open(RegionOpenRequest { - engine: String::new(), - region_dir, - options: HashMap::default(), - }), - ) - .await - .unwrap(); - assert!(engine.is_region_exists(region_id)); -} - fn column_metadata_to_column_schema(metadata: &ColumnMetadata) -> api::v1::ColumnSchema { api::v1::ColumnSchema { column_name: metadata.column_schema.name.clone(), diff --git a/src/mito2/src/region/version.rs b/src/mito2/src/region/version.rs index c140855797..149dbe7d24 100644 --- a/src/mito2/src/region/version.rs +++ b/src/mito2/src/region/version.rs @@ -51,6 +51,7 @@ impl VersionControl { version: Arc::new(version), committed_sequence: 0, last_entry_id: 0, + is_dropped: false, }), } } @@ -90,6 +91,13 @@ impl VersionControl { version_data.version = new_version; Some(mutable_id) } + + /// Mark all opened files as deleted and set the delete marker in [VersionControlData] + pub(crate) fn mark_dropped(&self) { + let mut data = self.data.write().unwrap(); + data.is_dropped = true; + data.version.ssts.mark_all_deleted(); + } } pub(crate) type VersionControlRef = Arc; @@ -103,6 +111,8 @@ pub(crate) struct VersionControlData { pub(crate) committed_sequence: SequenceNumber, /// Last WAL entry Id. pub(crate) last_entry_id: EntryId, + /// Marker of whether this region is dropped/dropping + pub(crate) is_dropped: bool, } /// Static metadata of a region. diff --git a/src/mito2/src/region_write_ctx.rs b/src/mito2/src/region_write_ctx.rs index b531f9f2ce..2cb82db308 100644 --- a/src/mito2/src/region_write_ctx.rs +++ b/src/mito2/src/region_write_ctx.rs @@ -102,6 +102,7 @@ impl RegionWriteCtx { version, committed_sequence, last_entry_id, + .. } = version_control.current(); RegionWriteCtx { diff --git a/src/mito2/src/sst/file.rs b/src/mito2/src/sst/file.rs index 0fbd978748..05da526a20 100644 --- a/src/mito2/src/sst/file.rs +++ b/src/mito2/src/sst/file.rs @@ -134,7 +134,6 @@ impl FileHandle { } /// Mark the file as deleted and will delete it on drop asynchronously - #[inline] pub fn mark_deleted(&self) { self.inner.deleted.store(true, Ordering::Relaxed); } diff --git a/src/mito2/src/sst/version.rs b/src/mito2/src/sst/version.rs index f016162d39..05c9d0f741 100644 --- a/src/mito2/src/sst/version.rs +++ b/src/mito2/src/sst/version.rs @@ -40,6 +40,15 @@ impl SstVersion { pub(crate) fn levels(&self) -> &[LevelMeta] { &self.levels } + + /// Mark all SSTs in this version as deleted. + pub(crate) fn mark_all_deleted(&self) { + for level_meta in self.levels.iter() { + for file_handle in level_meta.files.values() { + file_handle.mark_deleted(); + } + } + } } // We only has fixed number of level, so we use array to hold elements. This implementation diff --git a/src/mito2/src/worker.rs b/src/mito2/src/worker.rs index a829624bfa..44934bbab0 100644 --- a/src/mito2/src/worker.rs +++ b/src/mito2/src/worker.rs @@ -16,6 +16,7 @@ mod handle_close; mod handle_create; +mod handle_drop; mod handle_flush; mod handle_open; mod handle_write; @@ -48,6 +49,8 @@ use crate::wal::Wal; /// Identifier for a worker. pub(crate) type WorkerId = u32; +pub(crate) const DROPPING_MARKER_FILE: &str = ".dropping"; + #[cfg_attr(doc, aquamarine::aquamarine)] /// A fixed size group of [RegionWorkers](RegionWorker). /// @@ -189,6 +192,7 @@ impl WorkerStarter { id: self.id, config: self.config, regions: regions.clone(), + dropping_regions: Arc::new(RegionMap::default()), receiver, wal: Wal::new(self.log_store), object_store: self.object_store, @@ -302,6 +306,8 @@ struct RegionWorkerLoop { config: Arc, /// Regions bound to the worker. regions: RegionMapRef, + /// Regions that are not yet fully dropped. + dropping_regions: RegionMapRef, /// Request receiver. receiver: Receiver, /// WAL of the engine. @@ -400,10 +406,8 @@ impl RegionWorkerLoop { DdlRequest::Create(req) => self.handle_create_request(ddl.region_id, req).await, DdlRequest::Open(req) => self.handle_open_request(ddl.region_id, req).await, DdlRequest::Close(_) => self.handle_close_request(ddl.region_id).await, - DdlRequest::Alter(_) - | DdlRequest::Drop(_) - | DdlRequest::Flush(_) - | DdlRequest::Compact(_) => todo!(), + DdlRequest::Drop(_) => self.handle_drop_request(ddl.region_id).await, + DdlRequest::Alter(_) | DdlRequest::Flush(_) | DdlRequest::Compact(_) => todo!(), }; if let Some(sender) = ddl.sender { diff --git a/src/mito2/src/worker/handle_drop.rs b/src/mito2/src/worker/handle_drop.rs new file mode 100644 index 0000000000..7a7d07b981 --- /dev/null +++ b/src/mito2/src/worker/handle_drop.rs @@ -0,0 +1,147 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Handling drop request. + +use std::time::Duration; + +use common_query::Output; +use common_telemetry::info; +use common_telemetry::tracing::warn; +use futures::StreamExt; +use object_store::util::join_path; +use object_store::{EntryMode, Metakey, ObjectStore}; +use snafu::ResultExt; +use store_api::storage::RegionId; +use tokio::time::sleep; + +use crate::error::{OpenDalSnafu, RegionNotFoundSnafu, Result}; +use crate::region::RegionMapRef; +use crate::worker::{RegionWorkerLoop, DROPPING_MARKER_FILE}; + +const GC_TASK_INTERVAL_SEC: u64 = 5 * 60; // 5 minutes +const MAX_RETRY_TIMES: u64 = 288; // 24 hours (5m * 288) + +impl RegionWorkerLoop { + pub(crate) async fn handle_drop_request(&mut self, region_id: RegionId) -> Result { + let Some(region) = self.regions.get_region(region_id) else { + return RegionNotFoundSnafu { region_id }.fail(); + }; + + info!("Try to drop region: {}", region_id); + region.stop().await?; + + // write dropping marker + let marker_path = join_path(region.access_layer.region_dir(), DROPPING_MARKER_FILE); + self.object_store + .write(&marker_path, vec![]) + .await + .context(OpenDalSnafu)?; + + // remove this region from region map to prevent other requests from accessing this region + self.regions.remove_region(region_id); + self.dropping_regions.insert_region(region.clone()); + + // mark region version as dropped + region.version_control.mark_dropped(); + info!( + "Region {} is dropped logically, but some files are not deleted yet", + region_id + ); + + // detach a background task to delete the region dir + let region_dir = region.access_layer.region_dir().to_owned(); + let object_store = self.object_store.clone(); + let dropping_regions = self.dropping_regions.clone(); + common_runtime::spawn_bg(async move { + later_drop_task(region_id, region_dir, object_store, dropping_regions).await; + }); + + Ok(Output::AffectedRows(0)) + } +} + +/// Background GC task to remove the entire region path once it find there is no +/// parquet file left. +/// +/// This task will keep running until finished. Any resource captured by it will +/// not be released before then. Be sure to only pass weak reference if something +/// is depended on ref-count mechanism. +async fn later_drop_task( + region_id: RegionId, + region_path: String, + object_store: ObjectStore, + dropping_regions: RegionMapRef, +) { + for _ in 0..MAX_RETRY_TIMES { + sleep(Duration::from_secs(GC_TASK_INTERVAL_SEC)).await; + let result = remove_region_dir_once(®ion_path, &object_store).await; + if let Err(err) = result { + warn!( + "Error occurs during trying to GC region dir {}: {}", + region_path, err + ); + } else { + dropping_regions.remove_region(region_id); + info!("Region {} is dropped", region_path); + } + } + + warn!( + "Failed to GC region dir {} after {} retries, giving up", + region_path, MAX_RETRY_TIMES + ); +} + +// TODO(ruihang): place the marker in a separate dir +pub(crate) async fn remove_region_dir_once( + region_path: &str, + object_store: &ObjectStore, +) -> Result<()> { + // list all files under the given region path to check if there are un-deleted parquet files + let mut has_parquet_file = false; + // record all paths that neither ends with .parquet nor the marker file + let mut files_to_remove_first = vec![]; + let mut files = object_store.scan(region_path).await.context(OpenDalSnafu)?; + while let Some(file) = files.next().await { + let file = file.context(OpenDalSnafu)?; + if file.path().ends_with(".parquet") { + has_parquet_file = true; + break; + } else if !file.path().ends_with(DROPPING_MARKER_FILE) { + let meta = object_store + .metadata(&file, Metakey::Mode) + .await + .context(OpenDalSnafu)?; + if meta.mode() == EntryMode::FILE { + files_to_remove_first.push(file.path().to_string()); + } + } + } + + if !has_parquet_file { + // no parquet file found, delete the region path + // first delete all files other than the marker + object_store + .remove(files_to_remove_first) + .await + .context(OpenDalSnafu)?; + // then remove the marker with this dir + object_store + .remove_all(region_path) + .await + .context(OpenDalSnafu)?; + } + Ok(()) +} diff --git a/src/mito2/src/worker/handle_flush.rs b/src/mito2/src/worker/handle_flush.rs index b7547c3205..6e523c2bc4 100644 --- a/src/mito2/src/worker/handle_flush.rs +++ b/src/mito2/src/worker/handle_flush.rs @@ -44,6 +44,7 @@ impl RegionWorkerLoop { // 2. write manifest // 3. update region metadata. // 4. handle all pending requests. + // 5. remove flushed files if the region is dropped. unimplemented!() } diff --git a/src/mito2/src/worker/handle_open.rs b/src/mito2/src/worker/handle_open.rs index 64f50788d8..5614178c08 100644 --- a/src/mito2/src/worker/handle_open.rs +++ b/src/mito2/src/worker/handle_open.rs @@ -18,13 +18,16 @@ use std::sync::Arc; use common_query::Output; use common_telemetry::info; +use object_store::util::join_path; +use snafu::ResultExt; use store_api::logstore::LogStore; use store_api::region_request::RegionOpenRequest; use store_api::storage::RegionId; -use crate::error::Result; +use crate::error::{OpenDalSnafu, RegionNotFoundSnafu, Result}; use crate::region::opener::RegionOpener; -use crate::worker::RegionWorkerLoop; +use crate::worker::handle_drop::remove_region_dir_once; +use crate::worker::{RegionWorkerLoop, DROPPING_MARKER_FILE}; impl RegionWorkerLoop { pub(crate) async fn handle_open_request( @@ -36,6 +39,19 @@ impl RegionWorkerLoop { return Ok(Output::AffectedRows(0)); } + // Check if this region is pending drop. And clean the entire dir if so. + if !self.dropping_regions.is_region_exists(region_id) + && self + .object_store + .is_exist(&join_path(&request.region_dir, DROPPING_MARKER_FILE)) + .await + .context(OpenDalSnafu)? + { + let result = remove_region_dir_once(&request.region_dir, &self.object_store).await; + info!("Region {} is dropped, result: {:?}", region_id, result); + return RegionNotFoundSnafu { region_id }.fail(); + } + info!("Try to open region {}", region_id); // Open region from specific region dir.