From 56ee8baa3f4bd3d7036929336acdb1f4d65564b7 Mon Sep 17 00:00:00 2001 From: discord9 <55937128+discord9@users.noreply.github.com> Date: Fri, 6 Mar 2026 16:25:44 +0800 Subject: [PATCH] feat: admin gc table/regions (#7619) * feat: gc table Signed-off-by: discord9 * test: admin gc Signed-off-by: discord9 * chore: after rebase fix Signed-off-by: discord9 * refactor: GcStats Signed-off-by: discord9 * refactor: use gc ticker for admin gc Signed-off-by: discord9 * fix: region routes override Signed-off-by: discord9 * test: non happy path Signed-off-by: discord9 * refactor: gc job report enum Signed-off-by: discord9 * test: process 0 regions Signed-off-by: discord9 * after rebase Signed-off-by: discord9 * feat: allow manual gc to return error Signed-off-by: discord9 * chore: update proto Signed-off-by: discord9 * per review Signed-off-by: discord9 * chore: timeout and update proto Signed-off-by: discord9 * chore: udpate proto Signed-off-by: discord9 --------- Signed-off-by: discord9 --- Cargo.lock | 2 +- Cargo.toml | 2 +- src/common/function/src/admin.rs | 4 + src/common/function/src/admin/gc.rs | 220 ++++++++ src/common/function/src/handlers.rs | 10 +- src/common/function/src/state.rs | 21 +- src/common/meta/src/procedure_executor.rs | 28 +- src/common/meta/src/rpc/procedure.rs | 24 + .../src/heartbeat/handler/gc_worker.rs | 8 +- src/meta-client/src/client.rs | 38 +- src/meta-client/src/client/procedure.rs | 116 +++- src/meta-srv/src/gc.rs | 2 +- src/meta-srv/src/gc/dropped.rs | 16 +- src/meta-srv/src/gc/handler.rs | 39 +- src/meta-srv/src/gc/mock.rs | 3 + src/meta-srv/src/gc/mock/basic.rs | 50 +- src/meta-srv/src/gc/mock/concurrent.rs | 50 +- src/meta-srv/src/gc/mock/err_handle.rs | 61 ++- src/meta-srv/src/gc/mock/integration.rs | 52 +- src/meta-srv/src/gc/mock/misc.rs | 28 +- src/meta-srv/src/gc/scheduler.rs | 248 ++++++++- src/meta-srv/src/service/procedure.rs | 220 +++++++- src/mito2/src/gc.rs | 3 + src/operator/src/procedure.rs | 20 +- src/store-api/src/storage/file.rs | 13 + tests-integration/src/test_util.rs | 9 + tests-integration/src/tests/gc.rs | 1 + tests-integration/src/tests/gc/admin.rs | 518 ++++++++++++++++++ tests-integration/tests/repartition.rs | 13 +- 29 files changed, 1687 insertions(+), 132 deletions(-) create mode 100644 src/common/function/src/admin/gc.rs create mode 100644 tests-integration/src/tests/gc/admin.rs diff --git a/Cargo.lock b/Cargo.lock index 790ff46130..be6ad4c665 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5690,7 +5690,7 @@ dependencies = [ [[package]] name = "greptime-proto" version = "0.1.0" -source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=4010e25ea09bf0a3ef359b946376da67d0323c5d#4010e25ea09bf0a3ef359b946376da67d0323c5d" +source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=092ba1d01e2da676dca66cca7eebb55009da8ef8#092ba1d01e2da676dca66cca7eebb55009da8ef8" dependencies = [ "prost 0.14.1", "prost-types 0.14.1", diff --git a/Cargo.toml b/Cargo.toml index fa6fdc0298..17cdb97a52 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -154,7 +154,7 @@ etcd-client = { version = "0.17", features = [ fst = "0.4.7" futures = "0.3" futures-util = "0.3" -greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "4010e25ea09bf0a3ef359b946376da67d0323c5d" } +greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "092ba1d01e2da676dca66cca7eebb55009da8ef8" } hex = "0.4" http = "1" humantime = "2.1" diff --git a/src/common/function/src/admin.rs b/src/common/function/src/admin.rs index e7fd186b86..c31b608967 100644 --- a/src/common/function/src/admin.rs +++ b/src/common/function/src/admin.rs @@ -15,6 +15,7 @@ mod build_index_table; mod flush_compact_region; mod flush_compact_table; +mod gc; mod migrate_region; mod reconcile_catalog; mod reconcile_database; @@ -22,6 +23,7 @@ mod reconcile_table; use flush_compact_region::{CompactRegionFunction, FlushRegionFunction}; use flush_compact_table::{CompactTableFunction, FlushTableFunction}; +use gc::{GcRegionsFunction, GcTableFunction}; use migrate_region::MigrateRegionFunction; use reconcile_catalog::ReconcileCatalogFunction; use reconcile_database::ReconcileDatabaseFunction; @@ -42,6 +44,8 @@ impl AdminFunction { registry.register(CompactRegionFunction::factory()); registry.register(FlushTableFunction::factory()); registry.register(CompactTableFunction::factory()); + registry.register(GcRegionsFunction::factory()); + registry.register(GcTableFunction::factory()); registry.register(BuildIndexFunction::factory()); registry.register(FlushFlowFunction::factory()); registry.register(ReconcileCatalogFunction::factory()); diff --git a/src/common/function/src/admin/gc.rs b/src/common/function/src/admin/gc.rs new file mode 100644 index 0000000000..ad2e3d3890 --- /dev/null +++ b/src/common/function/src/admin/gc.rs @@ -0,0 +1,220 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::time::Duration; + +use common_error::ext::BoxedError; +use common_macro::admin_fn; +use common_meta::rpc::procedure::{GcRegionsRequest, GcTableRequest}; +use common_query::error::{ + InvalidFuncArgsSnafu, MissingProcedureServiceHandlerSnafu, Result, TableMutationSnafu, + UnsupportedInputDataTypeSnafu, +}; +use datafusion_expr::{Signature, TypeSignature, Volatility}; +use datatypes::arrow::datatypes::DataType as ArrowDataType; +use datatypes::prelude::*; +use session::context::QueryContextRef; +use snafu::{ResultExt, ensure}; + +use crate::handlers::ProcedureServiceHandlerRef; +use crate::helper::cast_u64; + +const DEFAULT_GC_TIMEOUT: Duration = Duration::from_secs(60); +const DEFAULT_FULL_FILE_LISTING: bool = false; + +#[admin_fn( + name = GcRegionsFunction, + display_name = gc_regions, + sig_fn = gc_regions_signature, + ret = uint64 +)] +pub(crate) async fn gc_regions( + procedure_service_handler: &ProcedureServiceHandlerRef, + _ctx: &QueryContextRef, + params: &[ValueRef<'_>], +) -> Result { + let (region_ids, full_file_listing) = parse_gc_regions_params(params)?; + + let resp = procedure_service_handler + .gc_regions(GcRegionsRequest { + region_ids, + full_file_listing, + timeout: DEFAULT_GC_TIMEOUT, + }) + .await?; + + Ok(Value::from(resp.processed_regions)) +} + +#[admin_fn( + name = GcTableFunction, + display_name = gc_table, + sig_fn = gc_table_signature, + ret = uint64 +)] +pub(crate) async fn gc_table( + procedure_service_handler: &ProcedureServiceHandlerRef, + query_ctx: &QueryContextRef, + params: &[ValueRef<'_>], +) -> Result { + let (catalog_name, schema_name, table_name, full_file_listing) = + parse_gc_table_params(params, query_ctx)?; + + let resp = procedure_service_handler + .gc_table(GcTableRequest { + catalog_name, + schema_name, + table_name, + full_file_listing, + timeout: DEFAULT_GC_TIMEOUT, + }) + .await?; + + Ok(Value::from(resp.processed_regions)) +} + +fn parse_gc_regions_params(params: &[ValueRef<'_>]) -> Result<(Vec, bool)> { + ensure!( + !params.is_empty(), + InvalidFuncArgsSnafu { + err_msg: "The length of the args is not correct, expect at least 1 region id, have 0" + .to_string(), + } + ); + + let (full_file_listing, region_params) = match params.last() { + Some(ValueRef::Boolean(value)) => (*value, ¶ms[..params.len() - 1]), + _ => (DEFAULT_FULL_FILE_LISTING, params), + }; + + ensure!( + !region_params.is_empty(), + InvalidFuncArgsSnafu { + err_msg: "The length of the args is not correct, expect at least 1 region id" + .to_string(), + } + ); + + let mut region_ids = Vec::with_capacity(region_params.len()); + for param in region_params { + let Some(region_id) = cast_u64(param)? else { + return UnsupportedInputDataTypeSnafu { + function: "gc_regions", + datatypes: params.iter().map(|v| v.data_type()).collect::>(), + } + .fail(); + }; + region_ids.push(region_id); + } + + Ok((region_ids, full_file_listing)) +} + +fn parse_gc_table_params( + params: &[ValueRef<'_>], + query_ctx: &QueryContextRef, +) -> Result<(String, String, String, bool)> { + ensure!( + matches!(params.len(), 1 | 2), + InvalidFuncArgsSnafu { + err_msg: format!( + "The length of the args is not correct, expect 1 or 2, have: {}", + params.len() + ), + } + ); + + let ValueRef::String(table_name) = params[0] else { + return UnsupportedInputDataTypeSnafu { + function: "gc_table", + datatypes: params.iter().map(|v| v.data_type()).collect::>(), + } + .fail(); + }; + + let full_file_listing = if params.len() == 2 { + let ValueRef::Boolean(value) = params[1] else { + return UnsupportedInputDataTypeSnafu { + function: "gc_table", + datatypes: params.iter().map(|v| v.data_type()).collect::>(), + } + .fail(); + }; + value + } else { + DEFAULT_FULL_FILE_LISTING + }; + + let (catalog_name, schema_name, table_name) = + session::table_name::table_name_to_full_name(table_name, query_ctx) + .map_err(BoxedError::new) + .context(TableMutationSnafu)?; + + Ok((catalog_name, schema_name, table_name, full_file_listing)) +} + +fn gc_regions_signature() -> Signature { + Signature::variadic_any(Volatility::Immutable) +} + +fn gc_table_signature() -> Signature { + Signature::one_of( + vec![ + TypeSignature::Uniform(1, vec![ArrowDataType::Utf8]), + TypeSignature::Exact(vec![ArrowDataType::Utf8, ArrowDataType::Boolean]), + ], + Volatility::Immutable, + ) +} + +#[cfg(test)] +mod tests { + use session::context::QueryContext; + + use super::*; + + #[test] + fn test_parse_gc_regions_params_with_full_file_listing() { + let params = vec![ + ValueRef::UInt64(1), + ValueRef::UInt64(2), + ValueRef::Boolean(true), + ]; + let (region_ids, full_file_listing) = parse_gc_regions_params(¶ms).unwrap(); + + assert_eq!(region_ids, vec![1, 2]); + assert!(full_file_listing); + } + + #[test] + fn test_parse_gc_regions_params_default_full_file_listing() { + let params = vec![ValueRef::UInt64(1), ValueRef::UInt32(2)]; + let (region_ids, full_file_listing) = parse_gc_regions_params(¶ms).unwrap(); + + assert_eq!(region_ids, vec![1, 2]); + assert!(!full_file_listing); + } + + #[test] + fn test_parse_gc_table_params_with_full_file_listing() { + let params = vec![ValueRef::String("public.t"), ValueRef::Boolean(true)]; + let (catalog, schema, table, full_file_listing) = + parse_gc_table_params(¶ms, &QueryContext::arc()).unwrap(); + + assert_eq!(catalog, "greptime"); + assert_eq!(schema, "public"); + assert_eq!(table, "t"); + assert!(full_file_listing); + } +} diff --git a/src/common/function/src/handlers.rs b/src/common/function/src/handlers.rs index 0e6060e90c..50318fd14f 100644 --- a/src/common/function/src/handlers.rs +++ b/src/common/function/src/handlers.rs @@ -19,7 +19,9 @@ use async_trait::async_trait; use catalog::CatalogManagerRef; use common_base::AffectedRows; use common_meta::rpc::procedure::{ - ManageRegionFollowerRequest, MigrateRegionRequest, ProcedureStateResponse, + GcRegionsRequest as MetaGcRegionsRequest, GcResponse as MetaGcResponse, + GcTableRequest as MetaGcTableRequest, ManageRegionFollowerRequest, MigrateRegionRequest, + ProcedureStateResponse, }; use common_query::Output; use common_query::error::Result; @@ -85,6 +87,12 @@ pub trait ProcedureServiceHandler: Send + Sync { /// Get the catalog manager fn catalog_manager(&self) -> &CatalogManagerRef; + + /// Manually trigger GC for specific regions. + async fn gc_regions(&self, request: MetaGcRegionsRequest) -> Result; + + /// Manually trigger GC for a table. + async fn gc_table(&self, request: MetaGcTableRequest) -> Result; } /// This flow service handler is only use for flush flow for now. diff --git a/src/common/function/src/state.rs b/src/common/function/src/state.rs index d1a3d341b4..06dff44e79 100644 --- a/src/common/function/src/state.rs +++ b/src/common/function/src/state.rs @@ -37,7 +37,8 @@ impl FunctionState { use catalog::CatalogManagerRef; use common_base::AffectedRows; use common_meta::rpc::procedure::{ - ManageRegionFollowerRequest, MigrateRegionRequest, ProcedureStateResponse, + GcRegionsRequest, GcResponse, GcTableRequest, ManageRegionFollowerRequest, + MigrateRegionRequest, ProcedureStateResponse, }; use common_query::Output; use common_query::error::Result; @@ -82,6 +83,24 @@ impl FunctionState { Ok(()) } + async fn gc_regions(&self, _request: GcRegionsRequest) -> Result { + Ok(GcResponse { + processed_regions: 1, + need_retry_regions: vec![], + deleted_files: 0, + deleted_indexes: 0, + }) + } + + async fn gc_table(&self, _request: GcTableRequest) -> Result { + Ok(GcResponse { + processed_regions: 1, + need_retry_regions: vec![], + deleted_files: 0, + deleted_indexes: 0, + }) + } + fn catalog_manager(&self) -> &CatalogManagerRef { unimplemented!() } diff --git a/src/common/meta/src/procedure_executor.rs b/src/common/meta/src/procedure_executor.rs index f27f7ebf47..c0165dd80f 100644 --- a/src/common/meta/src/procedure_executor.rs +++ b/src/common/meta/src/procedure_executor.rs @@ -25,8 +25,8 @@ use crate::error::{ }; use crate::rpc::ddl::{SubmitDdlTaskRequest, SubmitDdlTaskResponse}; use crate::rpc::procedure::{ - self, ManageRegionFollowerRequest, MigrateRegionRequest, MigrateRegionResponse, - ProcedureStateResponse, + self, GcRegionsRequest, GcResponse, GcTableRequest, ManageRegionFollowerRequest, + MigrateRegionRequest, MigrateRegionResponse, ProcedureStateResponse, }; /// The context of procedure executor. @@ -78,6 +78,30 @@ pub trait ProcedureExecutor: Send + Sync { pid: &str, ) -> Result; + /// Manually trigger GC for the specified regions. + async fn gc_regions( + &self, + _ctx: &ExecutorContext, + _request: GcRegionsRequest, + ) -> Result { + UnsupportedSnafu { + operation: "gc_regions", + } + .fail() + } + + /// Manually trigger GC for the specified table. + async fn gc_table( + &self, + _ctx: &ExecutorContext, + _request: GcTableRequest, + ) -> Result { + UnsupportedSnafu { + operation: "gc_table", + } + .fail() + } + async fn list_procedures(&self, ctx: &ExecutorContext) -> Result; } diff --git a/src/common/meta/src/rpc/procedure.rs b/src/common/meta/src/rpc/procedure.rs index 313b7e178e..baab254fef 100644 --- a/src/common/meta/src/rpc/procedure.rs +++ b/src/common/meta/src/rpc/procedure.rs @@ -78,6 +78,30 @@ pub struct RemoveRegionFollowerRequest { pub peer_id: u64, } +#[derive(Debug, Clone)] +pub struct GcRegionsRequest { + pub region_ids: Vec, + pub full_file_listing: bool, + pub timeout: Duration, +} + +#[derive(Debug, Clone)] +pub struct GcTableRequest { + pub catalog_name: String, + pub schema_name: String, + pub table_name: String, + pub full_file_listing: bool, + pub timeout: Duration, +} + +#[derive(Debug, Clone, Default, PartialEq, Eq)] +pub struct GcResponse { + pub processed_regions: u64, + pub need_retry_regions: Vec, + pub deleted_files: u64, + pub deleted_indexes: u64, +} + /// Cast the protobuf [`ProcedureId`] to common [`ProcedureId`]. pub fn pb_pid_to_pid(pid: &PbProcedureId) -> Result { ProcedureId::parse_str(&String::from_utf8_lossy(&pid.key)).with_context(|_| { diff --git a/src/datanode/src/heartbeat/handler/gc_worker.rs b/src/datanode/src/heartbeat/handler/gc_worker.rs index 9eb5fb368f..b974c28dcc 100644 --- a/src/datanode/src/heartbeat/handler/gc_worker.rs +++ b/src/datanode/src/heartbeat/handler/gc_worker.rs @@ -105,13 +105,9 @@ impl InstructionHandler for GcRegionsHandler { // Merge reports let mut merged_report = GcReport::default(); for report in reports { - merged_report - .deleted_files - .extend(report.deleted_files.into_iter()); - merged_report - .deleted_indexes - .extend(report.deleted_indexes.into_iter()); + merged_report.merge(report); } + Ok(merged_report) } .instrument(common_telemetry::tracing::info_span!("gc_worker_run")), diff --git a/src/meta-client/src/client.rs b/src/meta-client/src/client.rs index 00b208211a..4a79105615 100644 --- a/src/meta-client/src/client.rs +++ b/src/meta-client/src/client.rs @@ -48,9 +48,9 @@ use common_meta::range_stream::PaginationStream; use common_meta::rpc::KeyValue; use common_meta::rpc::ddl::{SubmitDdlTaskRequest, SubmitDdlTaskResponse}; use common_meta::rpc::procedure::{ - AddRegionFollowerRequest, AddTableFollowerRequest, ManageRegionFollowerRequest, - MigrateRegionRequest, MigrateRegionResponse, ProcedureStateResponse, - RemoveRegionFollowerRequest, RemoveTableFollowerRequest, + AddRegionFollowerRequest, AddTableFollowerRequest, GcRegionsRequest, GcResponse, + GcTableRequest, ManageRegionFollowerRequest, MigrateRegionRequest, MigrateRegionResponse, + ProcedureStateResponse, RemoveRegionFollowerRequest, RemoveTableFollowerRequest, }; use common_meta::rpc::store::{ BatchDeleteRequest, BatchDeleteResponse, BatchGetRequest, BatchGetResponse, BatchPutRequest, @@ -387,6 +387,28 @@ impl ProcedureExecutor for MetaClient { .context(meta_error::ExternalSnafu) } + async fn gc_regions( + &self, + _ctx: &ExecutorContext, + request: GcRegionsRequest, + ) -> MetaResult { + self.gc_regions(request) + .await + .map_err(BoxedError::new) + .context(meta_error::ExternalSnafu) + } + + async fn gc_table( + &self, + _ctx: &ExecutorContext, + request: GcTableRequest, + ) -> MetaResult { + self.gc_table(request) + .await + .map_err(BoxedError::new) + .context(meta_error::ExternalSnafu) + } + async fn list_procedures(&self, _ctx: &ExecutorContext) -> MetaResult { self.procedure_client() .map_err(BoxedError::new) @@ -715,6 +737,16 @@ impl MetaClient { self.procedure_client()?.reconcile(request).await } + /// Manually trigger GC for specific regions. + pub async fn gc_regions(&self, request: GcRegionsRequest) -> Result { + self.procedure_client()?.gc_regions(request).await + } + + /// Manually trigger GC for a table (all its regions). + pub async fn gc_table(&self, request: GcTableRequest) -> Result { + self.procedure_client()?.gc_table(request).await + } + /// Submit a DDL task pub async fn submit_ddl_task( &self, diff --git a/src/meta-client/src/client/procedure.rs b/src/meta-client/src/client/procedure.rs index f813aebe20..93e37511d9 100644 --- a/src/meta-client/src/client/procedure.rs +++ b/src/meta-client/src/client/procedure.rs @@ -18,11 +18,16 @@ use std::time::Duration; use api::v1::meta::procedure_service_client::ProcedureServiceClient; use api::v1::meta::{ - DdlTaskRequest, DdlTaskResponse, MigrateRegionRequest, MigrateRegionResponse, - ProcedureDetailRequest, ProcedureDetailResponse, ProcedureId, ProcedureStateResponse, - QueryProcedureRequest, ReconcileRequest, ReconcileResponse, ResponseHeader, Role, + DdlTaskRequest, DdlTaskResponse, GcRegionsRequest, GcRegionsResponse, GcTableRequest, + GcTableResponse, MigrateRegionRequest, MigrateRegionResponse, ProcedureDetailRequest, + ProcedureDetailResponse, ProcedureId, ProcedureStateResponse, QueryProcedureRequest, + ReconcileRequest, ReconcileResponse, RequestHeader, ResponseHeader, Role, }; use common_grpc::channel_manager::ChannelManager; +use common_meta::rpc::procedure::{ + GcRegionsRequest as MetaGcRegionsRequest, GcResponse as MetaGcResponse, + GcTableRequest as MetaGcTableRequest, +}; use common_telemetry::tracing_context::TracingContext; use common_telemetry::{error, info, warn}; use snafu::{ResultExt, ensure}; @@ -105,6 +110,16 @@ impl Client { let inner = self.inner.read().await; inner.list_procedures().await } + + pub async fn gc_regions(&self, request: MetaGcRegionsRequest) -> Result { + let inner = self.inner.read().await; + inner.gc_regions(request).await + } + + pub async fn gc_table(&self, request: MetaGcTableRequest) -> Result { + let inner = self.inner.read().await; + inner.gc_table(request).await + } } #[derive(Debug)] @@ -260,6 +275,78 @@ impl Inner { .await } + async fn gc_regions(&self, request: MetaGcRegionsRequest) -> Result { + let timeout = request.timeout; + let req = GcRegionsRequest { + header: Some(RequestHeader { + protocol_version: 0, + member_id: self.id, + role: self.role as i32, + tracing_context: TracingContext::from_current_span().to_w3c(), + }), + region_ids: request.region_ids, + full_file_listing: request.full_file_listing, + timeout_secs: timeout.as_secs() as u32, + }; + + let resp: GcRegionsResponse = self + .with_retry( + "gc_regions", + move |mut client| { + let mut req = Request::new(req.clone()); + req.set_timeout(timeout); + async move { client.gc_regions(req).await.map(|res| res.into_inner()) } + }, + |resp: &GcRegionsResponse| &resp.header, + ) + .await?; + + let stats = resp.stats.unwrap_or_default(); + Ok(MetaGcResponse { + processed_regions: stats.processed_regions, + need_retry_regions: stats.need_retry_regions, + deleted_files: stats.deleted_files, + deleted_indexes: stats.deleted_indexes, + }) + } + + async fn gc_table(&self, request: MetaGcTableRequest) -> Result { + let timeout = request.timeout; + let req = GcTableRequest { + header: Some(RequestHeader { + protocol_version: 0, + member_id: self.id, + role: self.role as i32, + tracing_context: TracingContext::from_current_span().to_w3c(), + }), + catalog_name: request.catalog_name, + schema_name: request.schema_name, + table_name: request.table_name, + full_file_listing: request.full_file_listing, + timeout_secs: timeout.as_secs() as u32, + }; + + let resp: GcTableResponse = self + .with_retry( + "gc_table", + move |mut client| { + let mut req = Request::new(req.clone()); + req.set_timeout(timeout); + async move { client.gc_table(req).await.map(|res| res.into_inner()) } + }, + |resp: &GcTableResponse| &resp.header, + ) + .await?; + + let stats = resp.stats.unwrap_or_default(); + Ok(MetaGcResponse { + processed_regions: stats.processed_regions, + need_retry_regions: stats.need_retry_regions, + deleted_files: stats.deleted_files, + deleted_indexes: stats.deleted_indexes, + }) + } + async fn query_procedure_state(&self, pid: &str) -> Result { let mut req = QueryProcedureRequest { pid: Some(ProcedureId { key: pid.into() }), @@ -333,10 +420,11 @@ mod tests { use api::v1::meta::heartbeat_server::{Heartbeat, HeartbeatServer}; use api::v1::meta::procedure_service_server::{ProcedureService, ProcedureServiceServer}; use api::v1::meta::{ - AskLeaderRequest, AskLeaderResponse, DdlTaskRequest, DdlTaskResponse, HeartbeatRequest, - HeartbeatResponse, MigrateRegionRequest, MigrateRegionResponse, Peer, - ProcedureDetailRequest, ProcedureDetailResponse, ProcedureStateResponse, - QueryProcedureRequest, ReconcileRequest, ReconcileResponse, ResponseHeader, Role, + AskLeaderRequest, AskLeaderResponse, DdlTaskRequest, DdlTaskResponse, GcRegionsRequest, + GcRegionsResponse, GcTableRequest, GcTableResponse, HeartbeatRequest, HeartbeatResponse, + MigrateRegionRequest, MigrateRegionResponse, Peer, ProcedureDetailRequest, + ProcedureDetailResponse, ProcedureStateResponse, QueryProcedureRequest, ReconcileRequest, + ReconcileResponse, ResponseHeader, Role, }; use async_trait::async_trait; use common_error::status_code::StatusCode; @@ -435,6 +523,20 @@ mod tests { ) -> Result, Status> { Err(Status::unimplemented("details is not used in this test")) } + + async fn gc_regions( + &self, + _request: Request, + ) -> Result, Status> { + Err(Status::unimplemented("gc_regions is not used in this test")) + } + + async fn gc_table( + &self, + _request: Request, + ) -> Result, Status> { + Err(Status::unimplemented("gc_table is not used in this test")) + } } #[tokio::test(flavor = "multi_thread")] diff --git a/src/meta-srv/src/gc.rs b/src/meta-srv/src/gc.rs index c9d346e5f8..f9b3869bc6 100644 --- a/src/meta-srv/src/gc.rs +++ b/src/meta-srv/src/gc.rs @@ -31,7 +31,7 @@ mod util; pub use options::GcSchedulerOptions; pub use procedure::BatchGcProcedure; -pub use scheduler::{Event, GcScheduler, GcTickerRef}; +pub use scheduler::{Event, GcJobReport, GcScheduler, GcTickerRef}; /// Mapping from region ID to its associated peers (leader and followers). pub type Region2Peers = HashMap)>; diff --git a/src/meta-srv/src/gc/dropped.rs b/src/meta-srv/src/gc/dropped.rs index 8d5af4eaca..b4793ee556 100644 --- a/src/meta-srv/src/gc/dropped.rs +++ b/src/meta-srv/src/gc/dropped.rs @@ -88,6 +88,16 @@ impl<'a> DroppedRegionCollector<'a> { pub async fn collect_and_assign( &self, table_reparts: &[(TableId, TableRepartValue)], + ) -> Result { + self.collect_and_assign_with_cooldown(table_reparts, true) + .await + } + + /// Collect and assign dropped regions for GC with optional cooldown filtering. + pub async fn collect_and_assign_with_cooldown( + &self, + table_reparts: &[(TableId, TableRepartValue)], + apply_cooldown: bool, ) -> Result { // get active region ids for all tables involved in repartitioning let active_region_ids: HashSet = { @@ -113,7 +123,11 @@ impl<'a> DroppedRegionCollector<'a> { return Ok(DroppedRegionAssignment::default()); } - let dropped_regions = self.filter_by_cooldown(dropped_regions).await; + let dropped_regions = if apply_cooldown { + self.filter_by_cooldown(dropped_regions).await + } else { + dropped_regions + }; if dropped_regions.is_empty() { return Ok(DroppedRegionAssignment::default()); diff --git a/src/meta-srv/src/gc/handler.rs b/src/meta-srv/src/gc/handler.rs index ccbfd00f6d..105ddca58c 100644 --- a/src/meta-srv/src/gc/handler.rs +++ b/src/meta-srv/src/gc/handler.rs @@ -114,12 +114,27 @@ impl GcScheduler { .await; let duration = start_time.elapsed(); - info!( - "Finished GC cycle. Processed {} datanodes ({} failed). Duration: {:?}", - report.per_datanode_reports.len(), - report.failed_datanodes.len(), - duration - ); + match &report { + GcJobReport::PerDatanode { + per_datanode_reports, + failed_datanodes, + } => { + info!( + "Finished GC cycle. Processed {} datanodes ({} failed). Duration: {:?}", + per_datanode_reports.len(), + failed_datanodes.len(), + duration + ); + } + GcJobReport::Combined { report } => { + info!( + "Finished GC cycle with combined report. Deleted files: {}, deleted indexes: {}. Duration: {:?}", + report.deleted_files.len(), + report.deleted_indexes.len(), + duration + ); + } + } debug!("Detailed GC Job Report: {report:#?}"); Ok(report) @@ -196,7 +211,8 @@ impl GcScheduler { force_full_listing_by_peer: HashMap>, region_routes_override_by_peer: HashMap, ) -> GcJobReport { - let mut report = GcJobReport::default(); + let mut per_datanode_reports = HashMap::new(); + let mut failed_datanodes: HashMap<_, Vec<_>> = HashMap::new(); // Create a stream of datanode GC tasks with limited concurrency let results: Vec<_> = futures::stream::iter( @@ -237,18 +253,21 @@ impl GcScheduler { for (peer, result) in results { match result { Ok(dn_report) => { - report.per_datanode_reports.insert(peer.id, dn_report); + per_datanode_reports.insert(peer.id, dn_report); } Err(e) => { error!(e; "Failed to process datanode GC for peer {}", peer); // Note: We don't have a direct way to map peer to table_id here, // so we just log the error. The table_reports will contain individual region failures. - report.failed_datanodes.entry(peer.id).or_default().push(e); + failed_datanodes.entry(peer.id).or_default().push(e); } } } - report + GcJobReport::PerDatanode { + per_datanode_reports, + failed_datanodes, + } } /// Process GC for a single datanode with all its candidate regions. diff --git a/src/meta-srv/src/gc/mock.rs b/src/meta-srv/src/gc/mock.rs index 67e886fba3..6c736490ab 100644 --- a/src/meta-srv/src/gc/mock.rs +++ b/src/meta-srv/src/gc/mock.rs @@ -50,13 +50,16 @@ pub const TEST_REGION_SIZE_200MB: u64 = 200_000_000; /// Helper function to create an empty GcReport for the given region IDs pub fn new_empty_report_with(region_ids: impl IntoIterator) -> GcReport { let mut deleted_files = HashMap::new(); + let mut processed_regions = HashSet::new(); for region_id in region_ids { deleted_files.insert(region_id, vec![]); + processed_regions.insert(region_id); } GcReport { deleted_files, deleted_indexes: HashMap::new(), need_retry_regions: HashSet::new(), + processed_regions, } } diff --git a/src/meta-srv/src/gc/mock/basic.rs b/src/meta-srv/src/gc/mock/basic.rs index f0455568bf..fb395d899e 100644 --- a/src/meta-srv/src/gc/mock/basic.rs +++ b/src/meta-srv/src/gc/mock/basic.rs @@ -34,8 +34,18 @@ async fn test_parallel_process_datanodes_empty() { .parallel_process_datanodes(HashMap::new(), HashMap::new(), HashMap::new()) .await; - assert_eq!(report.per_datanode_reports.len(), 0); - assert_eq!(report.failed_datanodes.len(), 0); + match report { + crate::gc::scheduler::GcJobReport::PerDatanode { + per_datanode_reports, + failed_datanodes, + } => { + assert_eq!(per_datanode_reports.len(), 0); + assert_eq!(failed_datanodes.len(), 0); + } + crate::gc::scheduler::GcJobReport::Combined { .. } => { + panic!("expected per-datanode report"); + } + } } #[tokio::test] @@ -88,8 +98,18 @@ async fn test_parallel_process_datanodes_with_candidates() { .parallel_process_datanodes(datanode_to_candidates, HashMap::new(), HashMap::new()) .await; - assert_eq!(report.per_datanode_reports.len(), 1); - assert_eq!(report.failed_datanodes.len(), 0); + match report { + crate::gc::scheduler::GcJobReport::PerDatanode { + per_datanode_reports, + failed_datanodes, + } => { + assert_eq!(per_datanode_reports.len(), 1); + assert_eq!(failed_datanodes.len(), 0); + } + crate::gc::scheduler::GcJobReport::Combined { .. } => { + panic!("expected per-datanode report"); + } + } } #[tokio::test] @@ -140,16 +160,18 @@ async fn test_handle_tick() { let report = scheduler.handle_tick().await.unwrap(); // Validate the returned GcJobReport - assert_eq!( - report.per_datanode_reports.len(), - 1, - "Should process 1 datanode" - ); - assert_eq!( - report.failed_datanodes.len(), - 0, - "Should have 0 failed datanodes" - ); + match report { + crate::gc::scheduler::GcJobReport::PerDatanode { + per_datanode_reports, + failed_datanodes, + } => { + assert_eq!(per_datanode_reports.len(), 1, "Should process 1 datanode"); + assert_eq!(failed_datanodes.len(), 0, "Should have 0 failed datanodes"); + } + crate::gc::scheduler::GcJobReport::Combined { .. } => { + panic!("expected per-datanode report"); + } + } assert_eq!(*ctx.get_table_to_region_stats_calls.lock().unwrap(), 1); assert_eq!(*ctx.gc_regions_calls.lock().unwrap(), 1); diff --git a/src/meta-srv/src/gc/mock/concurrent.rs b/src/meta-srv/src/gc/mock/concurrent.rs index 40cbb15168..2554c7046b 100644 --- a/src/meta-srv/src/gc/mock/concurrent.rs +++ b/src/meta-srv/src/gc/mock/concurrent.rs @@ -100,8 +100,18 @@ async fn test_concurrent_table_processing_limits() { .await; // Should process all datanodes - assert_eq!(report.per_datanode_reports.len(), 1); - assert_eq!(report.failed_datanodes.len(), 0); + match report { + crate::gc::scheduler::GcJobReport::PerDatanode { + per_datanode_reports, + failed_datanodes, + } => { + assert_eq!(per_datanode_reports.len(), 1); + assert_eq!(failed_datanodes.len(), 0); + } + crate::gc::scheduler::GcJobReport::Combined { .. } => { + panic!("expected per-datanode report"); + } + } } #[tokio::test] @@ -172,11 +182,21 @@ async fn test_datanode_processes_tables_with_partial_gc_failures() { .await; // Should have one datanode with mixed results - assert_eq!(report.per_datanode_reports.len(), 1); + let datanode_report = match &report { + crate::gc::scheduler::GcJobReport::PerDatanode { + per_datanode_reports, + failed_datanodes, + } => { + assert_eq!(per_datanode_reports.len(), 1); + assert_eq!(failed_datanodes.len(), 0); + per_datanode_reports.values().next().unwrap() + } + crate::gc::scheduler::GcJobReport::Combined { .. } => { + panic!("expected per-datanode report"); + } + }; // also check one failed region (region2 has no GC report, so it should be in need_retry_regions) - let datanode_report = report.per_datanode_reports.values().next().unwrap(); assert_eq!(datanode_report.need_retry_regions.len(), 1); - assert_eq!(report.failed_datanodes.len(), 0); } // Region Concurrency Tests @@ -376,7 +396,15 @@ async fn test_region_gc_concurrency_with_partial_failures() { .parallel_process_datanodes(datanode_to_candidates, HashMap::new(), HashMap::new()) .await; - let report = report.per_datanode_reports.get(&peer.id).unwrap(); + let report = match &report { + crate::gc::scheduler::GcJobReport::PerDatanode { + per_datanode_reports, + .. + } => per_datanode_reports.get(&peer.id).unwrap(), + crate::gc::scheduler::GcJobReport::Combined { .. } => { + panic!("expected per-datanode report"); + } + }; // Should have 3 successful and 3 failed regions // Even regions (2, 4, 6) should succeed, odd regions (1, 3, 5) should fail @@ -506,7 +534,15 @@ async fn test_region_gc_concurrency_with_retryable_errors() { .parallel_process_datanodes(datanode_to_candidates, HashMap::new(), HashMap::new()) .await; - let report = report.per_datanode_reports.get(&peer.id).unwrap(); + let report = match &report { + crate::gc::scheduler::GcJobReport::PerDatanode { + per_datanode_reports, + .. + } => per_datanode_reports.get(&peer.id).unwrap(), + crate::gc::scheduler::GcJobReport::Combined { .. } => { + panic!("expected per-datanode report"); + } + }; // In the new implementation without retry logic, all regions should be processed // The exact behavior depends on how the mock handles the regions diff --git a/src/meta-srv/src/gc/mock/err_handle.rs b/src/meta-srv/src/gc/mock/err_handle.rs index 77671fd177..0dd9b3d115 100644 --- a/src/meta-srv/src/gc/mock/err_handle.rs +++ b/src/meta-srv/src/gc/mock/err_handle.rs @@ -93,19 +93,30 @@ async fn test_gc_regions_failure_handling() { let report = scheduler.handle_tick().await.unwrap(); // Validate the report shows the failure handling - assert_eq!( - report.per_datanode_reports.len(), - 1, - "Should process 1 datanode despite failure" - ); - assert_eq!( - report.failed_datanodes.len(), - 0, - "Should have 0 failed datanodes (failure handled via need_retry_regions)" - ); + // Check the report shows the failure handling + let datanode_report = match &report { + crate::gc::scheduler::GcJobReport::PerDatanode { + per_datanode_reports, + failed_datanodes, + } => { + assert_eq!( + per_datanode_reports.len(), + 1, + "Should process 1 datanode despite failure" + ); + assert_eq!( + failed_datanodes.len(), + 0, + "Should have 0 failed datanodes (failure handled via need_retry_regions)" + ); + per_datanode_reports.values().next().unwrap() + } + crate::gc::scheduler::GcJobReport::Combined { .. } => { + panic!("expected per-datanode report"); + } + }; // Check that the region is in need_retry_regions due to the failure - let datanode_report = report.per_datanode_reports.values().next().unwrap(); assert_eq!( datanode_report.need_retry_regions.len(), 1, @@ -180,19 +191,25 @@ async fn test_get_file_references_failure() { // Validate the report shows the expected results // In the new implementation, even if get_file_references fails, we still create a datanode report - assert_eq!( - report.per_datanode_reports.len(), - 1, - "Should process 1 datanode" - ); - assert_eq!( - report.failed_datanodes.len(), - 0, - "Should have 0 failed datanodes (failure handled gracefully)" - ); + let datanode_report = match &report { + crate::gc::scheduler::GcJobReport::PerDatanode { + per_datanode_reports, + failed_datanodes, + } => { + assert_eq!(per_datanode_reports.len(), 1, "Should process 1 datanode"); + assert_eq!( + failed_datanodes.len(), + 0, + "Should have 0 failed datanodes (failure handled gracefully)" + ); + per_datanode_reports.values().next().unwrap() + } + crate::gc::scheduler::GcJobReport::Combined { .. } => { + panic!("expected per-datanode report"); + } + }; // The region should be processed but may have empty results due to file refs failure - let datanode_report = report.per_datanode_reports.values().next().unwrap(); // The current implementation still processes the region even with file refs failure // and creates an empty entry in deleted_files assert!( diff --git a/src/meta-srv/src/gc/mock/integration.rs b/src/meta-srv/src/gc/mock/integration.rs index 36c21e0770..8aa8f977ad 100644 --- a/src/meta-srv/src/gc/mock/integration.rs +++ b/src/meta-srv/src/gc/mock/integration.rs @@ -86,19 +86,19 @@ async fn test_full_gc_workflow() { let report = scheduler.handle_tick().await.unwrap(); // Validate the returned GcJobReport - should have 1 datanode report - assert_eq!( - report.per_datanode_reports.len(), - 1, - "Should process 1 datanode" - ); - assert_eq!( - report.failed_datanodes.len(), - 0, - "Should have no failed datanodes" - ); - - // Get the datanode report - let datanode_report = report.per_datanode_reports.values().next().unwrap(); + let datanode_report = match &report { + crate::gc::scheduler::GcJobReport::PerDatanode { + per_datanode_reports, + failed_datanodes, + } => { + assert_eq!(per_datanode_reports.len(), 1, "Should process 1 datanode"); + assert_eq!(failed_datanodes.len(), 0, "Should have no failed datanodes"); + per_datanode_reports.values().next().unwrap() + } + crate::gc::scheduler::GcJobReport::Combined { .. } => { + panic!("expected per-datanode report"); + } + }; // Check that the region was processed successfully assert!( @@ -216,19 +216,19 @@ async fn test_tracker_cleanup() { let report = scheduler.handle_tick().await.unwrap(); // Validate the returned GcJobReport - should have 1 datanode report - assert_eq!( - report.per_datanode_reports.len(), - 1, - "Should process 1 datanode" - ); - assert_eq!( - report.failed_datanodes.len(), - 0, - "Should have no failed datanodes" - ); - - // Get the datanode report - let datanode_report = report.per_datanode_reports.values().next().unwrap(); + let datanode_report = match &report { + crate::gc::scheduler::GcJobReport::PerDatanode { + per_datanode_reports, + failed_datanodes, + } => { + assert_eq!(per_datanode_reports.len(), 1, "Should process 1 datanode"); + assert_eq!(failed_datanodes.len(), 0, "Should have no failed datanodes"); + per_datanode_reports.values().next().unwrap() + } + crate::gc::scheduler::GcJobReport::Combined { .. } => { + panic!("expected per-datanode report"); + } + }; // Check that the region was processed successfully assert!( diff --git a/src/meta-srv/src/gc/mock/misc.rs b/src/meta-srv/src/gc/mock/misc.rs index 5aa7256edd..76d14136e4 100644 --- a/src/meta-srv/src/gc/mock/misc.rs +++ b/src/meta-srv/src/gc/mock/misc.rs @@ -73,8 +73,18 @@ async fn test_empty_file_refs_manifest() { .parallel_process_datanodes(datanode_to_candidates, HashMap::new(), HashMap::new()) .await; - assert_eq!(report.per_datanode_reports.len(), 1); - assert_eq!(report.failed_datanodes.len(), 0); + match report { + crate::gc::scheduler::GcJobReport::PerDatanode { + per_datanode_reports, + failed_datanodes, + } => { + assert_eq!(per_datanode_reports.len(), 1); + assert_eq!(failed_datanodes.len(), 0); + } + crate::gc::scheduler::GcJobReport::Combined { .. } => { + panic!("expected per-datanode report"); + } + } // Should handle empty file refs gracefully } @@ -150,6 +160,16 @@ async fn test_multiple_regions_per_table() { .parallel_process_datanodes(datanode_to_candidates, HashMap::new(), HashMap::new()) .await; - assert_eq!(report.per_datanode_reports.len(), 1); - assert_eq!(report.failed_datanodes.len(), 0); + match report { + crate::gc::scheduler::GcJobReport::PerDatanode { + per_datanode_reports, + failed_datanodes, + } => { + assert_eq!(per_datanode_reports.len(), 1); + assert_eq!(failed_datanodes.len(), 0); + } + crate::gc::scheduler::GcJobReport::Combined { .. } => { + panic!("expected per-datanode report"); + } + } } diff --git a/src/meta-srv/src/gc/scheduler.rs b/src/meta-srv/src/gc/scheduler.rs index a275c64452..772c7e7cb6 100644 --- a/src/meta-srv/src/gc/scheduler.rs +++ b/src/meta-srv/src/gc/scheduler.rs @@ -12,23 +12,25 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; use std::sync::Arc; -use std::time::Instant; +use std::time::{Duration, Instant}; use common_meta::DatanodeId; use common_meta::key::TableMetadataManagerRef; use common_procedure::ProcedureManagerRef; use common_telemetry::tracing::Instrument as _; use common_telemetry::{error, info}; -use store_api::storage::GcReport; +use store_api::storage::{GcReport, RegionId}; use tokio::sync::mpsc::{Receiver, Sender}; use tokio::sync::{Mutex, oneshot}; use crate::cluster::MetaPeerClientRef; use crate::define_ticker; use crate::error::{Error, Result}; +use crate::gc::Region2Peers; use crate::gc::ctx::{DefaultGcSchedulerCtx, SchedulerCtx}; +use crate::gc::dropped::DroppedRegionCollector; use crate::gc::options::{GcSchedulerOptions, TICKER_INTERVAL}; use crate::gc::tracker::RegionGcTracker; use crate::metrics::{ @@ -37,10 +39,46 @@ use crate::metrics::{ use crate::service::mailbox::MailboxRef; /// Report for a GC job. -#[derive(Debug, Default)] -pub struct GcJobReport { - pub per_datanode_reports: HashMap, - pub failed_datanodes: HashMap>, +#[derive(Debug)] +pub enum GcJobReport { + PerDatanode { + per_datanode_reports: HashMap, + failed_datanodes: HashMap>, + }, + Combined { + report: GcReport, + }, +} + +impl Default for GcJobReport { + fn default() -> Self { + Self::PerDatanode { + per_datanode_reports: HashMap::new(), + failed_datanodes: HashMap::new(), + } + } +} + +impl GcJobReport { + pub fn combined(report: GcReport) -> Self { + Self::Combined { report } + } + + pub fn merge_to_report(self) -> GcReport { + match self { + GcJobReport::Combined { report } => report, + GcJobReport::PerDatanode { + per_datanode_reports, + .. + } => { + let mut combined = GcReport::default(); + for (_datanode_id, report) in per_datanode_reports { + combined.merge(report); + } + combined + } + } + } } /// [`Event`] represents various types of events that can be processed by the gc ticker. @@ -48,10 +86,20 @@ pub struct GcJobReport { /// Variants: /// - `Tick`: This event is used to trigger gc periodically. /// - `Manually`: This event is used to trigger a manual gc run and provides a channel -/// to send back the [`GcJobReport`] for that run. +/// to send back the result for that run. +/// Optional parameters allow specifying target regions and GC behavior. pub enum Event { Tick, - Manually(oneshot::Sender), + Manually { + /// Channel sender to return the GC job report or error + sender: oneshot::Sender>, + /// Optional specific region IDs to GC. If None, scheduler will select candidates automatically. + region_ids: Option>, + /// Optional override for full file listing. If None, uses scheduler config. + full_file_listing: Option, + /// Optional override for timeout. If None, uses scheduler config. + timeout: Option, + }, } #[allow(unused)] @@ -130,19 +178,23 @@ impl GcScheduler { error!(e; "Failed to handle gc tick"); } } - Event::Manually(sender) => { + Event::Manually { + sender, + region_ids, + full_file_listing, + timeout, + } => { info!("Received manually gc request"); let span = common_telemetry::tracing::info_span!("meta_gc_tick", trigger = "manual"); - match self.handle_tick().instrument(span).await { - Ok(report) => { - // ignore error - let _ = sender.send(report); - } - Err(e) => { - error!(e; "Failed to handle gc tick"); - } - }; + let result = self + .handle_manual_gc(region_ids, full_file_listing, timeout) + .instrument(span) + .await; + if let Err(e) = &result { + error!(e; "Failed to handle manual gc"); + } + let _ = sender.send(result); } } } @@ -162,4 +214,162 @@ impl GcScheduler { Ok(report) } + + /// Handles a manual GC request with optional specific parameters. + /// + /// If `region_ids` is specified, GC will be performed only on those regions. + /// Otherwise, falls back to automatic candidate selection. + pub(crate) async fn handle_manual_gc( + &self, + region_ids: Option>, + full_file_listing: Option, + timeout: Option, + ) -> Result { + info!("Start to handle manual gc request"); + + // No specific regions, use default tick behavior + let Some(regions) = region_ids else { + let report = self.trigger_gc().await?; + info!("Finished manual gc request"); + return Ok(report); + }; + + // Empty regions list, return empty report + if regions.is_empty() { + info!("Finished manual gc request"); + return Ok(GcJobReport::combined(GcReport::default())); + } + + let full_listing = full_file_listing.unwrap_or(false); + let gc_timeout = timeout.unwrap_or(self.config.mailbox_timeout); + + let region_set: HashSet = regions.iter().copied().collect(); + let table_reparts = self.ctx.get_table_reparts().await?; + let dropped_collector = + DroppedRegionCollector::new(self.ctx.as_ref(), &self.config, &self.region_gc_tracker); + let dropped_assignment = dropped_collector + .collect_and_assign_with_cooldown(&table_reparts, false) + .await?; + + let mut dropped_region_set = HashSet::new(); + let mut dropped_routes_override = Region2Peers::new(); + for overrides in dropped_assignment.region_routes_override.into_values() { + for (region_id, route) in overrides { + if region_set.contains(®ion_id) { + dropped_region_set.insert(region_id); + dropped_routes_override.insert(region_id, route); + } + } + } + + let (dropped_regions, active_regions): (Vec<_>, Vec<_>) = regions + .into_iter() + .partition(|region_id| dropped_region_set.contains(region_id)); + + let mut combined_report = GcReport::default(); + + if !active_regions.is_empty() { + let report = self + .ctx + .gc_regions( + &active_regions, + full_listing, + gc_timeout, + Region2Peers::new(), + ) + .await?; + combined_report.merge(report); + } + + if !dropped_regions.is_empty() { + let report = self + .ctx + .gc_regions(&dropped_regions, true, gc_timeout, dropped_routes_override) + .await?; + combined_report.merge(report); + } + + let report = GcJobReport::combined(combined_report); + + info!("Finished manual gc request"); + Ok(report) + } +} + +#[cfg(test)] +mod tests { + use std::collections::HashMap; + use std::time::Duration; + + use common_meta::datanode::RegionStat; + use common_meta::key::table_repart::TableRepartValue; + use common_meta::key::table_route::PhysicalTableRouteValue; + use store_api::storage::RegionId; + use table::metadata::TableId; + + use super::*; + + struct ErrorMockSchedulerCtx; + + #[async_trait::async_trait] + impl SchedulerCtx for ErrorMockSchedulerCtx { + async fn get_table_to_region_stats(&self) -> Result>> { + Ok(HashMap::new()) + } + + async fn get_table_reparts(&self) -> Result> { + Ok(vec![]) + } + + async fn get_table_route( + &self, + _table_id: TableId, + ) -> Result<(TableId, PhysicalTableRouteValue)> { + unreachable!("get_table_route should not be called in this test") + } + + async fn batch_get_table_route( + &self, + _table_ids: &[TableId], + ) -> Result> { + Ok(HashMap::new()) + } + + async fn gc_regions( + &self, + _region_ids: &[RegionId], + _full_file_listing: bool, + _timeout: Duration, + _region_routes_override: Region2Peers, + ) -> Result { + crate::error::UnexpectedSnafu { + violated: "mock gc failure".to_string(), + } + .fail() + } + } + + #[tokio::test] + async fn test_handle_manual_gc_propagates_error() { + let (tx, rx) = GcScheduler::channel(); + drop(tx); + + let scheduler = GcScheduler { + ctx: Arc::new(ErrorMockSchedulerCtx), + receiver: rx, + config: GcSchedulerOptions::default(), + region_gc_tracker: Arc::new(Mutex::new(HashMap::new())), + last_tracker_cleanup: Arc::new(Mutex::new(Instant::now())), + }; + + let result = scheduler + .handle_manual_gc( + Some(vec![RegionId::new(1, 0)]), + Some(false), + Some(Duration::from_secs(1)), + ) + .await; + + assert!(result.is_err()); + } } diff --git a/src/meta-srv/src/service/procedure.rs b/src/meta-srv/src/service/procedure.rs index 90fb66f39c..25add392a6 100644 --- a/src/meta-srv/src/service/procedure.rs +++ b/src/meta-srv/src/service/procedure.rs @@ -16,24 +16,32 @@ use std::time::Duration; use api::v1::meta::reconcile_request::Target; use api::v1::meta::{ - DdlTaskRequest as PbDdlTaskRequest, DdlTaskResponse as PbDdlTaskResponse, MigrateRegionRequest, + DdlTaskRequest as PbDdlTaskRequest, DdlTaskResponse as PbDdlTaskResponse, GcRegionsRequest, + GcRegionsResponse, GcStats, GcTableRequest, GcTableResponse, MigrateRegionRequest, MigrateRegionResponse, ProcedureDetailRequest, ProcedureDetailResponse, ProcedureStateResponse, QueryProcedureRequest, ReconcileCatalog, ReconcileDatabase, ReconcileRequest, ReconcileResponse, ReconcileTable, ResolveStrategy, procedure_service_server, }; +use common_meta::key::TableMetadataManagerRef; +use common_meta::key::table_name::TableNameKey; use common_meta::procedure_executor::ExecutorContext; use common_meta::rpc::ddl::{DdlTask, SubmitDdlTaskRequest}; -use common_meta::rpc::procedure; +use common_meta::rpc::procedure::{ + self, GcRegionsRequest as MetaGcRegionsRequest, GcResponse, + GcTableRequest as MetaGcTableRequest, +}; use snafu::{OptionExt, ResultExt}; +use store_api::storage::RegionId; use table::table_reference::TableReference; use tonic::Request; +use crate::error::{TableMetadataManagerSnafu, TableNotFoundSnafu}; use crate::metasrv::Metasrv; use crate::procedure::region_migration::manager::{ RegionMigrationProcedureTask, RegionMigrationTriggerReason, }; use crate::service::GrpcResult; -use crate::{check_leader, error}; +use crate::{check_leader, error, gc}; #[async_trait::async_trait] impl procedure_service_server::ProcedureService for Metasrv { @@ -240,4 +248,210 @@ impl procedure_service_server::ProcedureService for Metasrv { metas, ))) } + + async fn gc_regions( + &self, + request: Request, + ) -> GrpcResult { + check_leader!(self, request, GcRegionsResponse, "`gc_regions`"); + + let GcRegionsRequest { + header, + region_ids, + full_file_listing, + timeout_secs, + } = request.into_inner(); + + let _header = header.context(error::MissingRequestHeaderSnafu)?; + + let response = self + .handle_gc_regions(MetaGcRegionsRequest { + region_ids, + full_file_listing, + timeout: Duration::from_secs(timeout_secs as u64), + }) + .await?; + + Ok(Response::new(gc_response_to_regions_pb(response))) + } + + async fn gc_table(&self, request: Request) -> GrpcResult { + check_leader!(self, request, GcTableResponse, "`gc_table`"); + + let GcTableRequest { + header, + catalog_name, + schema_name, + table_name, + full_file_listing, + timeout_secs, + } = request.into_inner(); + + let _header = header.context(error::MissingRequestHeaderSnafu)?; + + let response = self + .handle_gc_table(MetaGcTableRequest { + catalog_name, + schema_name, + table_name, + full_file_listing, + timeout: Duration::from_secs(timeout_secs as u64), + }) + .await?; + + Ok(Response::new(gc_response_to_table_pb(response))) + } +} + +impl Metasrv { + fn normalize_gc_timeout(timeout: Duration) -> Option { + if timeout.is_zero() { + None + } else { + Some(timeout) + } + } + + async fn handle_gc_regions(&self, request: MetaGcRegionsRequest) -> error::Result { + let region_ids: Vec = request + .region_ids + .into_iter() + .map(RegionId::from_u64) + .collect(); + self.trigger_gc_for_regions(region_ids, request.full_file_listing, request.timeout) + .await + } + + async fn handle_gc_table(&self, request: MetaGcTableRequest) -> error::Result { + let table_name_key = TableNameKey::new( + &request.catalog_name, + &request.schema_name, + &request.table_name, + ); + + let table_metadata_manager: &TableMetadataManagerRef = self.table_metadata_manager(); + let table_id = table_metadata_manager + .table_name_manager() + .get(table_name_key) + .await + .context(TableMetadataManagerSnafu)? + .context(TableNotFoundSnafu { + name: request.table_name.clone(), + })? + .table_id(); + + let (_phy_table_id, route) = table_metadata_manager + .table_route_manager() + .get_physical_table_route(table_id) + .await + .context(TableMetadataManagerSnafu)?; + + let region_ids: Vec = route.region_routes.iter().map(|r| r.region.id).collect(); + self.trigger_gc_for_regions(region_ids, request.full_file_listing, request.timeout) + .await + } + + /// Triggers manual GC for specified regions and returns the GC response. + async fn trigger_gc_for_regions( + &self, + region_ids: Vec, + full_file_listing: bool, + timeout: Duration, + ) -> error::Result { + let timeout = Self::normalize_gc_timeout(timeout); + let gc_ticker = self.gc_ticker().context(error::UnexpectedSnafu { + violated: "GC ticker not available".to_string(), + })?; + + let (tx, rx) = tokio::sync::oneshot::channel(); + gc_ticker + .sender + .send(gc::Event::Manually { + sender: tx, + region_ids: Some(region_ids), + full_file_listing: Some(full_file_listing), + timeout, + }) + .await + .map_err(|_| { + error::UnexpectedSnafu { + violated: "Failed to send GC event".to_string(), + } + .build() + })?; + + let job_report = rx.await.map_err(|_| { + error::UnexpectedSnafu { + violated: "GC job channel closed unexpectedly".to_string(), + } + .build() + })??; + + let report = gc_job_report_to_gc_report(job_report); + + Ok(gc_report_to_response(&report)) + } +} + +fn gc_job_report_to_gc_report(job_report: crate::gc::GcJobReport) -> store_api::storage::GcReport { + job_report.merge_to_report() +} + +fn gc_report_to_response(report: &store_api::storage::GcReport) -> GcResponse { + let deleted_files = report.deleted_files.values().map(|v| v.len() as u64).sum(); + let deleted_indexes = report + .deleted_indexes + .values() + .map(|v| v.len() as u64) + .sum(); + GcResponse { + processed_regions: report.processed_regions.len() as u64, + need_retry_regions: report + .need_retry_regions + .iter() + .map(|id| id.as_u64()) + .collect(), + deleted_files, + deleted_indexes, + } +} + +fn gc_response_to_regions_pb(resp: GcResponse) -> GcRegionsResponse { + GcRegionsResponse { + stats: Some(GcStats { + processed_regions: resp.processed_regions, + need_retry_regions: resp.need_retry_regions, + deleted_files: resp.deleted_files, + deleted_indexes: resp.deleted_indexes, + }), + ..Default::default() + } +} + +fn gc_response_to_table_pb(resp: GcResponse) -> GcTableResponse { + GcTableResponse { + stats: Some(GcStats { + processed_regions: resp.processed_regions, + need_retry_regions: resp.need_retry_regions, + deleted_files: resp.deleted_files, + deleted_indexes: resp.deleted_indexes, + }), + ..Default::default() + } +} + +#[cfg(test)] +mod tests { + use std::time::Duration; + + use super::Metasrv; + + #[test] + fn test_normalize_gc_timeout() { + assert_eq!(Metasrv::normalize_gc_timeout(Duration::ZERO), None); + assert_eq!( + Metasrv::normalize_gc_timeout(Duration::from_secs(10)), + Some(Duration::from_secs(10)) + ); + } } diff --git a/src/mito2/src/gc.rs b/src/mito2/src/gc.rs index b1468c0055..5b35909c99 100644 --- a/src/mito2/src/gc.rs +++ b/src/mito2/src/gc.rs @@ -282,6 +282,7 @@ impl LocalGcWorker { let mut deleted_files = HashMap::new(); let mut deleted_indexes = HashMap::new(); + let mut processed_regions = HashSet::new(); let tmp_ref_files = self.read_tmp_ref_files().await?; for (region_id, region) in &self.regions { let per_region_time = std::time::Instant::now(); @@ -309,6 +310,7 @@ impl LocalGcWorker { .collect_vec(); deleted_files.insert(*region_id, files.into_iter().map(|f| f.file_id()).collect()); deleted_indexes.insert(*region_id, index_files); + processed_regions.insert(*region_id); debug!( "GC for region {} took {} secs.", region_id, @@ -323,6 +325,7 @@ impl LocalGcWorker { deleted_files, deleted_indexes, need_retry_regions: HashSet::new(), + processed_regions, }; Ok(report) } diff --git a/src/operator/src/procedure.rs b/src/operator/src/procedure.rs index d6a2fbe03f..d98c09b77c 100644 --- a/src/operator/src/procedure.rs +++ b/src/operator/src/procedure.rs @@ -19,7 +19,9 @@ use common_error::ext::BoxedError; use common_function::handlers::ProcedureServiceHandler; use common_meta::procedure_executor::{ExecutorContext, ProcedureExecutorRef}; use common_meta::rpc::procedure::{ - ManageRegionFollowerRequest, MigrateRegionRequest, ProcedureStateResponse, + GcRegionsRequest as MetaGcRegionsRequest, GcResponse as MetaGcResponse, + GcTableRequest as MetaGcTableRequest, ManageRegionFollowerRequest, MigrateRegionRequest, + ProcedureStateResponse, }; use common_query::error as query_error; use common_query::error::Result as QueryResult; @@ -90,4 +92,20 @@ impl ProcedureServiceHandler for ProcedureServiceOperator { fn catalog_manager(&self) -> &CatalogManagerRef { &self.catalog_manager } + + async fn gc_regions(&self, request: MetaGcRegionsRequest) -> QueryResult { + self.procedure_executor + .gc_regions(&ExecutorContext::default(), request) + .await + .map_err(BoxedError::new) + .context(query_error::ProcedureServiceSnafu) + } + + async fn gc_table(&self, request: MetaGcTableRequest) -> QueryResult { + self.procedure_executor + .gc_table(&ExecutorContext::default(), request) + .await + .map_err(BoxedError::new) + .context(query_error::ProcedureServiceSnafu) + } } diff --git a/src/store-api/src/storage/file.rs b/src/store-api/src/storage/file.rs index 341a4c301a..8acc0e3b93 100644 --- a/src/store-api/src/storage/file.rs +++ b/src/store-api/src/storage/file.rs @@ -114,6 +114,8 @@ pub struct GcReport { pub deleted_indexes: HashMap>, /// Regions that need retry in next gc round, usually because their tmp ref files are outdated pub need_retry_regions: HashSet, + /// Regions successfully processed in this GC run + pub processed_regions: HashSet, } impl GcReport { @@ -126,6 +128,7 @@ impl GcReport { deleted_files, deleted_indexes, need_retry_regions, + processed_regions: HashSet::new(), } } @@ -139,7 +142,17 @@ impl GcReport { ); *self_files = dedup.into_iter().collect(); } + for (region, files) in other.deleted_indexes { + let self_files = self.deleted_indexes.entry(region).or_default(); + let dedup: HashSet<(FileId, IndexVersion)> = HashSet::from_iter( + std::mem::take(self_files) + .into_iter() + .chain(files.iter().cloned()), + ); + *self_files = dedup.into_iter().collect(); + } self.need_retry_regions.extend(other.need_retry_regions); + self.processed_regions.extend(other.processed_regions); // Remove regions that have succeeded from need_retry_regions self.need_retry_regions .retain(|region| !self.deleted_files.contains_key(region)); diff --git a/tests-integration/src/test_util.rs b/tests-integration/src/test_util.rs index 0c232c3c69..e5e6321412 100644 --- a/tests-integration/src/test_util.rs +++ b/tests-integration/src/test_util.rs @@ -927,6 +927,15 @@ pub async fn execute_sql(instance: &Arc, sql: &str) -> Output { .unwrap() } +pub async fn try_execute_sql( + instance: &Arc, + sql: &str, +) -> servers::error::Result { + SqlQueryHandler::do_query(instance.as_ref(), sql, QueryContext::arc()) + .await + .remove(0) +} + pub async fn execute_sql_and_expect(instance: &Arc, sql: &str, expected: &str) { let output = execute_sql(instance, sql).await; let output = output.data.pretty_print().await; diff --git a/tests-integration/src/tests/gc.rs b/tests-integration/src/tests/gc.rs index 1502099e7a..badaef5f7a 100644 --- a/tests-integration/src/tests/gc.rs +++ b/tests-integration/src/tests/gc.rs @@ -33,6 +33,7 @@ use crate::cluster::GreptimeDbClusterBuilder; use crate::test_util::{StorageType, TempDirGuard, execute_sql, get_test_store_config}; use crate::tests::test_util::{MockInstanceBuilder, TestContext, wait_procedure}; +mod admin; mod repart; /// Helper function to get table route information for GC procedure diff --git a/tests-integration/src/tests/gc/admin.rs b/tests-integration/src/tests/gc/admin.rs new file mode 100644 index 0000000000..b9e08392f5 --- /dev/null +++ b/tests-integration/src/tests/gc/admin.rs @@ -0,0 +1,518 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Integration tests for `ADMIN GC_TABLE` and `ADMIN GC_REGIONS` functions. + +use client::OutputData; +use common_meta::key::table_repart::TableRepartValue; +use common_recordbatch::RecordBatches; +use common_telemetry::info; +use common_test_util::recordbatch::check_output_stream; +use datatypes::arrow::array::{Array, AsArray}; +use datatypes::arrow::datatypes::UInt64Type; +use store_api::storage::RegionId; + +use super::{distributed_with_gc, list_sst_files}; +use crate::test_util::{StorageType, execute_sql, try_execute_sql}; + +#[tokio::test] +async fn test_admin_gc_table_different_store() { + let e = dotenv::dotenv(); + common_telemetry::init_default_ut_logging(); + let store_type = StorageType::build_storage_types_based_on_env(); + info!("store type with .env path: {e:?}: {:?}", store_type); + for store in store_type { + info!("Running admin GC table test with storage type: {}", store); + test_admin_gc_table(&store).await; + } +} + +/// Test `ADMIN GC_TABLE('table_name')` function +async fn test_admin_gc_table(store_type: &StorageType) { + let (test_context, _guard) = distributed_with_gc(store_type).await; + let instance = test_context.frontend(); + + // Step 1: Create table with append_mode to easily generate multiple files + let create_table_sql = r#" + CREATE TABLE test_admin_gc_table ( + ts TIMESTAMP TIME INDEX, + val DOUBLE, + host STRING + ) WITH (append_mode = 'true') + "#; + execute_sql(&instance, create_table_sql).await; + + // Step 2: Generate SST files by inserting data and flushing multiple times + for i in 0..4 { + let insert_sql = format!( + r#" + INSERT INTO test_admin_gc_table (ts, val, host) VALUES + ('2023-01-0{} 10:00:00', {}, 'host{}'), + ('2023-01-0{} 11:00:00', {}, 'host{}'), + ('2023-01-0{} 12:00:00', {}, 'host{}') + "#, + i + 1, + 10.0 + i as f64, + i, + i + 1, + 20.0 + i as f64, + i, + i + 1, + 30.0 + i as f64, + i + ); + execute_sql(&instance, &insert_sql).await; + + // Flush the table to create SST files + let flush_sql = "ADMIN FLUSH_TABLE('test_admin_gc_table')"; + execute_sql(&instance, flush_sql).await; + } + + // List SST files before compaction (for verification) + let sst_files_before_compaction = list_sst_files(&test_context).await; + info!( + "SST files before compaction: {:?}", + sst_files_before_compaction + ); + assert_eq!(sst_files_before_compaction.len(), 4); // 4 files from 4 flushes + + // Step 3: Trigger compaction to create garbage SST files + let compact_sql = "ADMIN COMPACT_TABLE('test_admin_gc_table')"; + execute_sql(&instance, compact_sql).await; + + // List SST files after compaction (should have both old and new files) + let sst_files_after_compaction = list_sst_files(&test_context).await; + info!( + "SST files after compaction: {:?}", + sst_files_after_compaction + ); + assert_eq!(sst_files_after_compaction.len(), 5); // 4 old + 1 new + + // Step 4: Use ADMIN GC_TABLE to clean up garbage files + let gc_table_sql = "ADMIN GC_TABLE('test_admin_gc_table')"; + let gc_output = execute_sql(&instance, gc_table_sql).await; + info!("GC_TABLE output: {:?}", gc_output); + + // Step 5: Verify GC results + let sst_files_after_gc = list_sst_files(&test_context).await; + info!("SST files after GC: {:?}", sst_files_after_gc); + assert_eq!(sst_files_after_gc.len(), 1); // Only the compacted file should remain after gc + + // Verify that data is still accessible + let count_sql = "SELECT COUNT(*) FROM test_admin_gc_table"; + let count_output = execute_sql(&instance, count_sql).await; + let expected = r#" ++----------+ +| count(*) | ++----------+ +| 12 | ++----------+"# + .trim(); + check_output_stream(count_output.data, expected).await; + + let select_sql = "SELECT * FROM test_admin_gc_table ORDER BY ts"; + let select_output = execute_sql(&instance, select_sql).await; + let expected = r#" ++---------------------+------+-------+ +| ts | val | host | ++---------------------+------+-------+ +| 2023-01-01T10:00:00 | 10.0 | host0 | +| 2023-01-01T11:00:00 | 20.0 | host0 | +| 2023-01-01T12:00:00 | 30.0 | host0 | +| 2023-01-02T10:00:00 | 11.0 | host1 | +| 2023-01-02T11:00:00 | 21.0 | host1 | +| 2023-01-02T12:00:00 | 31.0 | host1 | +| 2023-01-03T10:00:00 | 12.0 | host2 | +| 2023-01-03T11:00:00 | 22.0 | host2 | +| 2023-01-03T12:00:00 | 32.0 | host2 | +| 2023-01-04T10:00:00 | 13.0 | host3 | +| 2023-01-04T11:00:00 | 23.0 | host3 | +| 2023-01-04T12:00:00 | 33.0 | host3 | ++---------------------+------+-------+"# + .trim(); + check_output_stream(select_output.data, expected).await; + + info!("ADMIN GC_TABLE test completed successfully"); +} + +#[tokio::test] +async fn test_admin_gc_regions_different_store() { + let _ = dotenv::dotenv(); + common_telemetry::init_default_ut_logging(); + let store_type = StorageType::build_storage_types_based_on_env(); + info!("store type: {:?}", store_type); + for store in store_type { + info!("Running admin GC regions test with storage type: {}", store); + test_admin_gc_regions(&store).await; + } +} + +/// Test `ADMIN GC_REGIONS(region_id)` function +async fn test_admin_gc_regions(store_type: &StorageType) { + let (test_context, _guard) = distributed_with_gc(store_type).await; + let instance = test_context.frontend(); + + // Step 1: Create table with append_mode to easily generate multiple files + let create_table_sql = r#" + CREATE TABLE test_admin_gc_regions ( + ts TIMESTAMP TIME INDEX, + val DOUBLE, + host STRING + ) WITH (append_mode = 'true') + "#; + execute_sql(&instance, create_table_sql).await; + + // Step 2: Generate SST files by inserting data and flushing multiple times + for i in 0..4 { + let insert_sql = format!( + r#" + INSERT INTO test_admin_gc_regions (ts, val, host) VALUES + ('2023-01-0{} 10:00:00', {}, 'host{}'), + ('2023-01-0{} 11:00:00', {}, 'host{}'), + ('2023-01-0{} 12:00:00', {}, 'host{}') + "#, + i + 1, + 10.0 + i as f64, + i, + i + 1, + 20.0 + i as f64, + i, + i + 1, + 30.0 + i as f64, + i + ); + execute_sql(&instance, &insert_sql).await; + + // Flush the table to create SST files + let flush_sql = "ADMIN FLUSH_TABLE('test_admin_gc_regions')"; + execute_sql(&instance, flush_sql).await; + } + + // List SST files before compaction (for verification) + let sst_files_before_compaction = list_sst_files(&test_context).await; + info!( + "SST files before compaction: {:?}", + sst_files_before_compaction + ); + assert_eq!(sst_files_before_compaction.len(), 4); // 4 files from 4 flushes + + // Step 3: Trigger compaction to create garbage SST files + let compact_sql = "ADMIN COMPACT_TABLE('test_admin_gc_regions')"; + execute_sql(&instance, compact_sql).await; + + // List SST files after compaction (should have both old and new files) + let sst_files_after_compaction = list_sst_files(&test_context).await; + info!( + "SST files after compaction: {:?}", + sst_files_after_compaction + ); + assert_eq!(sst_files_after_compaction.len(), 5); // 4 old + 1 new + + // Step 4: Get region id from information_schema + let region_id_sql = "SELECT greptime_partition_id FROM information_schema.partitions WHERE table_name = 'test_admin_gc_regions' ORDER BY greptime_partition_id LIMIT 1"; + let region_output = execute_sql(&instance, region_id_sql).await; + info!("Region ID output: {:?}", region_output); + let OutputData::Stream(region_stream) = region_output.data else { + panic!("Expected stream output for region id query"); + }; + + // Extract the region id from the result + let region_id = { + let batches = RecordBatches::try_collect(region_stream).await.unwrap(); + let batch = &batches.iter().next().unwrap(); + let column = batch.column(0); + let array = column.as_primitive::(); + array.value(0) + }; + info!("Extracted region_id: {}", region_id); + + // Step 5: Use ADMIN GC_REGIONS to clean up garbage files for the specific region + let gc_regions_sql = format!("ADMIN GC_REGIONS({})", region_id); + let gc_output = execute_sql(&instance, &gc_regions_sql).await; + info!("GC_REGIONS output: {:?}", gc_output); + + // Step 6: Verify GC results + let sst_files_after_gc = list_sst_files(&test_context).await; + info!("SST files after GC: {:?}", sst_files_after_gc); + assert_eq!(sst_files_after_gc.len(), 1); // Only the compacted file should remain after gc + + // Verify that data is still accessible + let count_sql = "SELECT COUNT(*) FROM test_admin_gc_regions"; + let count_output = execute_sql(&instance, count_sql).await; + let expected = r#" ++----------+ +| count(*) | ++----------+ +| 12 | ++----------+"# + .trim(); + check_output_stream(count_output.data, expected).await; + + let select_sql = "SELECT * FROM test_admin_gc_regions ORDER BY ts"; + let select_output = execute_sql(&instance, select_sql).await; + let expected = r#" ++---------------------+------+-------+ +| ts | val | host | ++---------------------+------+-------+ +| 2023-01-01T10:00:00 | 10.0 | host0 | +| 2023-01-01T11:00:00 | 20.0 | host0 | +| 2023-01-01T12:00:00 | 30.0 | host0 | +| 2023-01-02T10:00:00 | 11.0 | host1 | +| 2023-01-02T11:00:00 | 21.0 | host1 | +| 2023-01-02T12:00:00 | 31.0 | host1 | +| 2023-01-03T10:00:00 | 12.0 | host2 | +| 2023-01-03T11:00:00 | 22.0 | host2 | +| 2023-01-03T12:00:00 | 32.0 | host2 | +| 2023-01-04T10:00:00 | 13.0 | host3 | +| 2023-01-04T11:00:00 | 23.0 | host3 | +| 2023-01-04T12:00:00 | 33.0 | host3 | ++---------------------+------+-------+"# + .trim(); + check_output_stream(select_output.data, expected).await; + + info!("ADMIN GC_REGIONS test completed successfully"); +} + +#[tokio::test] +async fn test_admin_gc_missing_cases_different_store() { + let _ = dotenv::dotenv(); + common_telemetry::init_default_ut_logging(); + let store_type = StorageType::build_storage_types_based_on_env(); + info!("store type: {:?}", store_type); + for store in store_type { + info!( + "Running admin GC missing cases test with storage type: {}", + store + ); + test_admin_gc_missing_cases(&store).await; + } +} + +async fn test_admin_gc_missing_cases(store_type: &StorageType) { + let (test_context, _guard) = distributed_with_gc(store_type).await; + let instance = test_context.frontend(); + let metasrv = test_context.metasrv(); + + // Case 1: Invalid table name + let invalid_table_sql = "ADMIN GC_TABLE('no_such_table')"; + let invalid_table_result = try_execute_sql(&instance, invalid_table_sql).await; + assert!( + invalid_table_result.is_err(), + "Expected error for invalid table name" + ); + + // Case 2: Invalid region id (same table, non-existent region number) + let invalid_region_table = "test_admin_gc_invalid_region"; + create_append_table(&instance, invalid_region_table, None).await; + let table_id = get_table_id(&instance, invalid_region_table).await; + let region_ids = fetch_region_ids(&instance, invalid_region_table).await; + let base_region = RegionId::from_u64(*region_ids.first().expect("region id exists")); + let invalid_region = RegionId::new(table_id, base_region.region_number().saturating_add(100)); + let invalid_region_sql = format!("ADMIN GC_REGIONS({})", invalid_region.as_u64()); + let sst_before_invalid_region = list_sst_files(&test_context).await; + let invalid_region_result = try_execute_sql(&instance, &invalid_region_sql).await; + match invalid_region_result { + Ok(output) => { + let processed_regions = extract_u64_output(output.data).await; + assert_eq!(processed_regions, 0); + let sst_after_invalid_region = list_sst_files(&test_context).await; + assert_eq!( + sst_before_invalid_region.len(), + sst_after_invalid_region.len() + ); + } + Err(_) => { + let sst_after_invalid_region = list_sst_files(&test_context).await; + assert_eq!( + sst_before_invalid_region.len(), + sst_after_invalid_region.len() + ); + } + } + + // Case 3: No garbage to collect (idempotent) + let no_garbage_table = "test_admin_gc_no_garbage"; + create_append_table(&instance, no_garbage_table, None).await; + insert_and_flush(&instance, no_garbage_table, 1).await; + let sst_before_gc = list_sst_files(&test_context).await; + let no_garbage_gc_sql = format!("ADMIN GC_TABLE('{no_garbage_table}')"); + let no_garbage_output = execute_sql(&instance, &no_garbage_gc_sql).await; + let processed_regions = extract_u64_output(no_garbage_output.data).await; + assert_eq!(processed_regions, 1); + let sst_after_gc = list_sst_files(&test_context).await; + assert_eq!(sst_before_gc.len(), sst_after_gc.len()); + + // Case 4: Multi-region table, multi-region GC + let multi_region_table = "test_admin_gc_multi_region"; + let partition_clause = + Some("PARTITION ON COLUMNS (host) (host < 'm', host >= 'm')".to_string()); + create_append_table(&instance, multi_region_table, partition_clause.as_deref()).await; + insert_and_flush(&instance, multi_region_table, 4).await; + let sst_before_compaction = list_sst_files(&test_context).await; + let compact_sql = format!("ADMIN COMPACT_TABLE('{multi_region_table}')"); + execute_sql(&instance, &compact_sql).await; + let sst_after_compaction = list_sst_files(&test_context).await; + assert!(sst_after_compaction.len() >= sst_before_compaction.len()); + let region_ids = fetch_region_ids(&instance, multi_region_table).await; + let gc_regions_sql = format!( + "ADMIN GC_REGIONS({})", + region_ids + .iter() + .map(|id| id.to_string()) + .collect::>() + .join(",") + ); + let gc_regions_output = execute_sql(&instance, &gc_regions_sql).await; + let processed_regions = extract_u64_output(gc_regions_output.data).await; + assert_eq!(processed_regions as usize, region_ids.len()); + let sst_after_gc = list_sst_files(&test_context).await; + assert!(sst_after_gc.len() < sst_after_compaction.len()); + + // Case 5: Manual GC on dropped region + let dropped_region_table = "test_admin_gc_dropped_region"; + create_append_table(&instance, dropped_region_table, partition_clause.as_deref()).await; + let dropped_table_id = get_table_id(&instance, dropped_region_table).await; + let (_routes, regions) = + super::get_table_route(metasrv.table_metadata_manager(), dropped_table_id).await; + let base_region = *regions.first().expect("table has at least one region"); + let dropped_region = RegionId::new( + dropped_table_id, + base_region.region_number().saturating_add(100), + ); + let dst_region = RegionId::new( + dropped_table_id, + base_region.region_number().saturating_add(200), + ); + let repart_mgr = metasrv.table_metadata_manager().table_repart_manager(); + let current = repart_mgr + .get_with_raw_bytes(dropped_table_id) + .await + .unwrap(); + let mut repart_value = TableRepartValue::new(); + repart_value.update_mappings(dropped_region, &[dst_region]); + repart_mgr + .upsert_value(dropped_table_id, current, &repart_value) + .await + .unwrap(); + let dropped_gc_sql = format!("ADMIN GC_REGIONS({})", dropped_region.as_u64()); + let dropped_gc_output = execute_sql(&instance, &dropped_gc_sql).await; + let processed_regions = extract_u64_output(dropped_gc_output.data).await; + assert_eq!(processed_regions, 1); + + // Case 6: Cooldown bypass for manual GC (should still process regions on immediate re-run) + let cooldown_region = *region_ids.first().expect("region id exists"); + let cooldown_gc_sql = format!("ADMIN GC_REGIONS({cooldown_region})"); + execute_sql(&instance, &cooldown_gc_sql).await; + let cooldown_gc_output = execute_sql(&instance, &cooldown_gc_sql).await; + let processed_regions = extract_u64_output(cooldown_gc_output.data).await; + assert_eq!(processed_regions, 1); +} + +async fn create_append_table( + instance: &std::sync::Arc, + table_name: &str, + partition_clause: Option<&str>, +) { + let partition_clause = partition_clause.unwrap_or(""); + let create_table_sql = format!( + "\ + CREATE TABLE {table_name} (\ + ts TIMESTAMP TIME INDEX,\ + val DOUBLE,\ + host STRING\ + ) {partition_clause} WITH (append_mode = 'true')\ + " + ); + execute_sql(instance, &create_table_sql).await; +} + +async fn insert_and_flush( + instance: &std::sync::Arc, + table_name: &str, + rounds: usize, +) { + for i in 0..rounds { + let insert_sql = format!( + "\ + INSERT INTO {table_name} (ts, val, host) VALUES\ + ('2023-01-0{} 10:00:00', {}, 'host{}'),\ + ('2023-01-0{} 11:00:00', {}, 'host{}'),\ + ('2023-01-0{} 12:00:00', {}, 'host{}')\ + ", + i + 1, + 10.0 + i as f64, + i, + i + 1, + 20.0 + i as f64, + i, + i + 1, + 30.0 + i as f64, + i + ); + execute_sql(instance, &insert_sql).await; + let flush_sql = format!("ADMIN FLUSH_TABLE('{table_name}')"); + execute_sql(instance, &flush_sql).await; + } +} + +async fn fetch_region_ids( + instance: &std::sync::Arc, + table_name: &str, +) -> Vec { + let region_id_sql = format!( + "SELECT greptime_partition_id FROM information_schema.partitions WHERE table_name = '{}' ORDER BY greptime_partition_id", + table_name + ); + let region_output = execute_sql(instance, ®ion_id_sql).await; + let OutputData::Stream(region_stream) = region_output.data else { + panic!("Expected stream output for region id query"); + }; + let batches = RecordBatches::try_collect(region_stream).await.unwrap(); + let mut region_ids = Vec::new(); + for batch in batches.iter() { + let column = batch.column(0); + let array = column.as_primitive::(); + for idx in 0..array.len() { + if array.is_valid(idx) { + region_ids.push(array.value(idx)); + } + } + } + region_ids +} + +async fn get_table_id( + instance: &std::sync::Arc, + table_name: &str, +) -> table::metadata::TableId { + let table = instance + .catalog_manager() + .table("greptime", "public", table_name, None) + .await + .unwrap() + .unwrap(); + table.table_info().table_id() +} + +async fn extract_u64_output(output: OutputData) -> u64 { + let recordbatches = match output { + OutputData::Stream(stream) => RecordBatches::try_collect(stream).await.unwrap(), + OutputData::RecordBatches(recordbatches) => recordbatches, + _ => panic!("Unexpected output type"), + }; + let batch = recordbatches.iter().next().expect("non-empty recordbatch"); + let column = batch.column(0); + let array = column.as_primitive::(); + array.value(0) +} diff --git a/tests-integration/tests/repartition.rs b/tests-integration/tests/repartition.rs index 4347417b9a..ce8ec5bf96 100644 --- a/tests-integration/tests/repartition.rs +++ b/tests-integration/tests/repartition.rs @@ -125,8 +125,17 @@ async fn trigger_table_gc(metasrv: &Arc, table_name: &str) { async fn trigger_full_gc(ticker: &GcTickerRef) { info!("triggering full gc"); let (tx, rx) = oneshot::channel(); - ticker.sender.send(gc::Event::Manually(tx)).await.unwrap(); - let _ = rx.await.unwrap(); + ticker + .sender + .send(gc::Event::Manually { + sender: tx, + region_ids: None, + full_file_listing: None, + timeout: None, + }) + .await + .unwrap(); + let _ = rx.await.unwrap().unwrap(); } fn query_partitions_sql(table_name: &str) -> String {