mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-06-01 12:50:40 +00:00
feat: add partial_drop to DropRequest (#7597)
* feat: add `partial_drop` to `DropRequest` Signed-off-by: WenyXu <wenymedia@gmail.com> * feat: handle non-partial-drop drop task Signed-off-by: WenyXu <wenymedia@gmail.com> * feat: remove files immediately Signed-off-by: WenyXu <wenymedia@gmail.com> * chore: update proto Signed-off-by: WenyXu <wenymedia@gmail.com> --------- Signed-off-by: WenyXu <wenymedia@gmail.com>
This commit is contained in:
2
Cargo.lock
generated
2
Cargo.lock
generated
@@ -5726,7 +5726,7 @@ dependencies = [
|
||||
[[package]]
|
||||
name = "greptime-proto"
|
||||
version = "0.1.0"
|
||||
source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=4a14b140f20d2a8fa2046fbbc5e0947203a3e34c#4a14b140f20d2a8fa2046fbbc5e0947203a3e34c"
|
||||
source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=d8e09e8487c344323ee364b67f7187b1e0a6102c#d8e09e8487c344323ee364b67f7187b1e0a6102c"
|
||||
dependencies = [
|
||||
"prost 0.14.1",
|
||||
"prost-types 0.14.1",
|
||||
|
||||
@@ -152,7 +152,7 @@ etcd-client = { version = "0.17", features = [
|
||||
fst = "0.4.7"
|
||||
futures = "0.3"
|
||||
futures-util = "0.3"
|
||||
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "4a14b140f20d2a8fa2046fbbc5e0947203a3e34c" }
|
||||
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "d8e09e8487c344323ee364b67f7187b1e0a6102c" }
|
||||
hex = "0.4"
|
||||
http = "1"
|
||||
humantime = "2.1"
|
||||
|
||||
@@ -126,6 +126,7 @@ impl State for DropDatabaseExecutor {
|
||||
&self.physical_region_routes,
|
||||
true,
|
||||
false,
|
||||
false,
|
||||
)
|
||||
.await?;
|
||||
info!("Table: {}({}) is dropped", self.table_name, self.table_id);
|
||||
|
||||
@@ -162,6 +162,7 @@ impl DropTableProcedure {
|
||||
&self.data.physical_region_routes,
|
||||
false,
|
||||
false,
|
||||
false,
|
||||
)
|
||||
.await?;
|
||||
self.data.state = DropTableState::DeleteTombstone;
|
||||
|
||||
@@ -219,6 +219,7 @@ impl DropTableExecutor {
|
||||
region_routes: &[RegionRoute],
|
||||
fast_path: bool,
|
||||
force: bool,
|
||||
partial_drop: bool,
|
||||
) -> Result<()> {
|
||||
// Drops leader regions on datanodes.
|
||||
let leaders = find_leaders(region_routes);
|
||||
@@ -243,6 +244,7 @@ impl DropTableExecutor {
|
||||
region_id: region_id.as_u64(),
|
||||
fast_path,
|
||||
force,
|
||||
partial_drop,
|
||||
})),
|
||||
};
|
||||
let datanode = datanode.clone();
|
||||
|
||||
@@ -1750,6 +1750,7 @@ mod tests {
|
||||
RegionRequest::Drop(RegionDropRequest {
|
||||
fast_path: false,
|
||||
force: false,
|
||||
partial_drop: false,
|
||||
}),
|
||||
)
|
||||
.await
|
||||
@@ -1851,6 +1852,7 @@ mod tests {
|
||||
RegionRequest::Drop(RegionDropRequest {
|
||||
fast_path: false,
|
||||
force: false,
|
||||
partial_drop: false,
|
||||
}),
|
||||
)
|
||||
.await
|
||||
|
||||
@@ -133,6 +133,7 @@ impl DeallocateRegion {
|
||||
region_routes,
|
||||
false,
|
||||
true,
|
||||
true,
|
||||
)
|
||||
.await
|
||||
.context(error::DeallocateRegionsSnafu { table_id })?;
|
||||
|
||||
@@ -919,6 +919,7 @@ mod test {
|
||||
RegionRequest::Drop(RegionDropRequest {
|
||||
fast_path: false,
|
||||
force: false,
|
||||
partial_drop: false,
|
||||
}),
|
||||
)
|
||||
.await
|
||||
@@ -934,6 +935,7 @@ mod test {
|
||||
RegionRequest::Drop(RegionDropRequest {
|
||||
fast_path: false,
|
||||
force: true,
|
||||
partial_drop: false,
|
||||
}),
|
||||
)
|
||||
.await
|
||||
|
||||
@@ -74,7 +74,9 @@ impl MetricEngineInner {
|
||||
if is_physical_region_busy && force {
|
||||
info!("Dropping physical region {} with force", data_region_id);
|
||||
}
|
||||
return self.drop_physical_region(data_region_id).await;
|
||||
return self
|
||||
.drop_physical_region(data_region_id, req.partial_drop)
|
||||
.await;
|
||||
}
|
||||
|
||||
if fast_path {
|
||||
@@ -105,7 +107,11 @@ impl MetricEngineInner {
|
||||
}
|
||||
}
|
||||
|
||||
async fn drop_physical_region(&self, region_id: RegionId) -> Result<AffectedRows> {
|
||||
async fn drop_physical_region(
|
||||
&self,
|
||||
region_id: RegionId,
|
||||
partial_drop: bool,
|
||||
) -> Result<AffectedRows> {
|
||||
let data_region_id = utils::to_data_region_id(region_id);
|
||||
let metadata_region_id = utils::to_metadata_region_id(region_id);
|
||||
|
||||
@@ -118,6 +124,7 @@ impl MetricEngineInner {
|
||||
RegionRequest::Drop(RegionDropRequest {
|
||||
fast_path: false,
|
||||
force: false,
|
||||
partial_drop,
|
||||
}),
|
||||
)
|
||||
.await
|
||||
@@ -128,6 +135,7 @@ impl MetricEngineInner {
|
||||
RegionRequest::Drop(RegionDropRequest {
|
||||
fast_path: false,
|
||||
force: false,
|
||||
partial_drop,
|
||||
}),
|
||||
)
|
||||
.await
|
||||
|
||||
@@ -74,6 +74,7 @@ async fn test_engine_drop_region_with_format(flat_format: bool) {
|
||||
RegionRequest::Drop(RegionDropRequest {
|
||||
fast_path: false,
|
||||
force: false,
|
||||
partial_drop: false,
|
||||
}),
|
||||
)
|
||||
.await
|
||||
@@ -111,6 +112,7 @@ async fn test_engine_drop_region_with_format(flat_format: bool) {
|
||||
RegionRequest::Drop(RegionDropRequest {
|
||||
fast_path: false,
|
||||
force: false,
|
||||
partial_drop: false,
|
||||
}),
|
||||
)
|
||||
.await
|
||||
@@ -258,6 +260,7 @@ async fn test_engine_drop_region_for_custom_store_with_format(flat_format: bool)
|
||||
RegionRequest::Drop(RegionDropRequest {
|
||||
fast_path: false,
|
||||
force: false,
|
||||
partial_drop: false,
|
||||
}),
|
||||
)
|
||||
.await
|
||||
|
||||
@@ -39,8 +39,8 @@ use store_api::region_engine::{
|
||||
use store_api::region_request::{
|
||||
AffectedRows, ApplyStagingManifestRequest, EnterStagingRequest, RegionAlterRequest,
|
||||
RegionBuildIndexRequest, RegionBulkInsertsRequest, RegionCatchupRequest, RegionCloseRequest,
|
||||
RegionCompactRequest, RegionCreateRequest, RegionFlushRequest, RegionOpenRequest,
|
||||
RegionRequest, RegionTruncateRequest,
|
||||
RegionCompactRequest, RegionCreateRequest, RegionDropRequest, RegionFlushRequest,
|
||||
RegionOpenRequest, RegionRequest, RegionTruncateRequest,
|
||||
};
|
||||
use store_api::storage::{FileId, RegionId};
|
||||
use tokio::sync::oneshot::{self, Receiver, Sender};
|
||||
@@ -687,10 +687,10 @@ impl WorkerRequest {
|
||||
sender: sender.into(),
|
||||
request: DdlRequest::Create(v),
|
||||
}),
|
||||
RegionRequest::Drop(_) => WorkerRequest::Ddl(SenderDdlRequest {
|
||||
RegionRequest::Drop(v) => WorkerRequest::Ddl(SenderDdlRequest {
|
||||
region_id,
|
||||
sender: sender.into(),
|
||||
request: DdlRequest::Drop,
|
||||
request: DdlRequest::Drop(v),
|
||||
}),
|
||||
RegionRequest::Open(v) => WorkerRequest::Ddl(SenderDdlRequest {
|
||||
region_id,
|
||||
@@ -845,7 +845,7 @@ impl WorkerRequest {
|
||||
#[derive(Debug)]
|
||||
pub(crate) enum DdlRequest {
|
||||
Create(RegionCreateRequest),
|
||||
Drop,
|
||||
Drop(RegionDropRequest),
|
||||
Open((RegionOpenRequest, Option<WalEntryReceiver>)),
|
||||
Close(RegionCloseRequest),
|
||||
Alter(RegionAlterRequest),
|
||||
|
||||
@@ -1095,7 +1095,10 @@ impl<S: LogStore> RegionWorkerLoop<S> {
|
||||
for ddl in ddl_requests.drain(..) {
|
||||
let res = match ddl.request {
|
||||
DdlRequest::Create(req) => self.handle_create_request(ddl.region_id, req).await,
|
||||
DdlRequest::Drop => self.handle_drop_request(ddl.region_id).await,
|
||||
DdlRequest::Drop(req) => {
|
||||
self.handle_drop_request(ddl.region_id, req.partial_drop)
|
||||
.await
|
||||
}
|
||||
DdlRequest::Open((req, wal_entry_receiver)) => {
|
||||
self.handle_open_request(ddl.region_id, req, wal_entry_receiver, ddl.sender)
|
||||
.await;
|
||||
|
||||
@@ -41,6 +41,7 @@ where
|
||||
pub(crate) async fn handle_drop_request(
|
||||
&mut self,
|
||||
region_id: RegionId,
|
||||
partial_drop: bool,
|
||||
) -> Result<AffectedRows> {
|
||||
let region = self.regions.writable_region(region_id)?;
|
||||
|
||||
@@ -113,10 +114,6 @@ where
|
||||
let gc_enabled = self.file_ref_manager.is_gc_enabled();
|
||||
|
||||
common_runtime::spawn_global(async move {
|
||||
let gc_duration = listener
|
||||
.on_later_drop_begin(region_id)
|
||||
.unwrap_or(Duration::from_secs(GC_TASK_INTERVAL_SEC));
|
||||
|
||||
let removed = if gc_enabled {
|
||||
later_drop_task_with_global_gc(
|
||||
region_id,
|
||||
@@ -124,10 +121,14 @@ where
|
||||
path_type,
|
||||
object_store,
|
||||
dropping_regions,
|
||||
gc_duration,
|
||||
partial_drop,
|
||||
)
|
||||
.await
|
||||
} else {
|
||||
let gc_duration = listener
|
||||
.on_later_drop_begin(region_id)
|
||||
.unwrap_or(Duration::from_secs(GC_TASK_INTERVAL_SEC));
|
||||
|
||||
later_drop_task_without_global_gc(
|
||||
region_id,
|
||||
region_dir.clone(),
|
||||
@@ -177,7 +178,7 @@ async fn later_drop_task_without_global_gc(
|
||||
region_path,
|
||||
object_store,
|
||||
dropping_regions,
|
||||
gc_duration,
|
||||
Some(gc_duration),
|
||||
false,
|
||||
)
|
||||
.await
|
||||
@@ -188,7 +189,7 @@ async fn remove_region_with_retry(
|
||||
region_path: String,
|
||||
object_store: ObjectStore,
|
||||
dropping_regions: std::sync::Arc<crate::region::RegionMap>,
|
||||
gc_duration: Duration,
|
||||
gc_duration: Option<Duration>,
|
||||
mut force: bool,
|
||||
) -> bool {
|
||||
for _ in 0..MAX_RETRY_TIMES {
|
||||
@@ -207,7 +208,9 @@ async fn remove_region_with_retry(
|
||||
}
|
||||
Ok(false) => (),
|
||||
}
|
||||
sleep(gc_duration).await;
|
||||
if let Some(duration) = gc_duration {
|
||||
sleep(duration).await;
|
||||
}
|
||||
// Force recycle after gc duration.
|
||||
force = true;
|
||||
}
|
||||
@@ -226,15 +229,19 @@ async fn later_drop_task_with_global_gc(
|
||||
path_type: PathType,
|
||||
object_store: ObjectStore,
|
||||
dropping_regions: RegionMapRef,
|
||||
gc_duration: Duration,
|
||||
partial_drop: bool,
|
||||
) -> bool {
|
||||
if path_type == PathType::Metadata {
|
||||
// For metadata regions or regions marked for full deletion (such as when dropping a table)
|
||||
// the region directory is forcefully removed immediately.
|
||||
//
|
||||
// TODO(discord9): Evaluate removing files instantly rather than waiting for the GC period.
|
||||
if path_type == PathType::Metadata || !partial_drop {
|
||||
remove_region_with_retry(
|
||||
region_id,
|
||||
region_path,
|
||||
object_store,
|
||||
dropping_regions,
|
||||
gc_duration,
|
||||
None,
|
||||
true,
|
||||
)
|
||||
.await
|
||||
|
||||
@@ -272,6 +272,7 @@ fn parse_region_drop(drop: DropRequest) -> Result<(RegionId, RegionDropRequest)>
|
||||
RegionDropRequest {
|
||||
fast_path: drop.fast_path,
|
||||
force: drop.force,
|
||||
partial_drop: drop.partial_drop,
|
||||
},
|
||||
))
|
||||
}
|
||||
@@ -542,6 +543,10 @@ pub struct RegionDropRequest {
|
||||
/// Forces the drop of a physical region and all its associated logical regions.
|
||||
/// Only relevant for physical regions managed by the Metric Engine.
|
||||
pub force: bool,
|
||||
|
||||
/// If true, indicates that only a portion of the region is being dropped, and files may still be referenced by other regions.
|
||||
/// This is used to prevent deletion of files that are still in use by other regions.
|
||||
pub partial_drop: bool,
|
||||
}
|
||||
|
||||
/// Open region request.
|
||||
|
||||
Reference in New Issue
Block a user