feat: admin gc table/regions (#7619)

* feat: gc table

Signed-off-by: discord9 <discord9@163.com>

* test: admin gc

Signed-off-by: discord9 <discord9@163.com>

* chore: after rebase fix

Signed-off-by: discord9 <discord9@163.com>

* refactor: GcStats

Signed-off-by: discord9 <discord9@163.com>

* refactor: use gc ticker for admin gc

Signed-off-by: discord9 <discord9@163.com>

* fix: region routes override

Signed-off-by: discord9 <discord9@163.com>

* test: non happy path

Signed-off-by: discord9 <discord9@163.com>

* refactor: gc job report enum

Signed-off-by: discord9 <discord9@163.com>

* test: process 0 regions

Signed-off-by: discord9 <discord9@163.com>

* after rebase

Signed-off-by: discord9 <discord9@163.com>

* feat: allow manual gc to return error

Signed-off-by: discord9 <discord9@163.com>

* chore: update proto

Signed-off-by: discord9 <discord9@163.com>

* per review

Signed-off-by: discord9 <discord9@163.com>

* chore: timeout and update proto

Signed-off-by: discord9 <discord9@163.com>

* chore: udpate proto

Signed-off-by: discord9 <discord9@163.com>

---------

Signed-off-by: discord9 <discord9@163.com>
This commit is contained in:
discord9
2026-03-06 16:25:44 +08:00
committed by GitHub
parent 5e6d2b221e
commit 56ee8baa3f
29 changed files with 1687 additions and 132 deletions

2
Cargo.lock generated
View File

@@ -5690,7 +5690,7 @@ dependencies = [
[[package]]
name = "greptime-proto"
version = "0.1.0"
source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=4010e25ea09bf0a3ef359b946376da67d0323c5d#4010e25ea09bf0a3ef359b946376da67d0323c5d"
source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=092ba1d01e2da676dca66cca7eebb55009da8ef8#092ba1d01e2da676dca66cca7eebb55009da8ef8"
dependencies = [
"prost 0.14.1",
"prost-types 0.14.1",

View File

@@ -154,7 +154,7 @@ etcd-client = { version = "0.17", features = [
fst = "0.4.7"
futures = "0.3"
futures-util = "0.3"
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "4010e25ea09bf0a3ef359b946376da67d0323c5d" }
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "092ba1d01e2da676dca66cca7eebb55009da8ef8" }
hex = "0.4"
http = "1"
humantime = "2.1"

View File

@@ -15,6 +15,7 @@
mod build_index_table;
mod flush_compact_region;
mod flush_compact_table;
mod gc;
mod migrate_region;
mod reconcile_catalog;
mod reconcile_database;
@@ -22,6 +23,7 @@ mod reconcile_table;
use flush_compact_region::{CompactRegionFunction, FlushRegionFunction};
use flush_compact_table::{CompactTableFunction, FlushTableFunction};
use gc::{GcRegionsFunction, GcTableFunction};
use migrate_region::MigrateRegionFunction;
use reconcile_catalog::ReconcileCatalogFunction;
use reconcile_database::ReconcileDatabaseFunction;
@@ -42,6 +44,8 @@ impl AdminFunction {
registry.register(CompactRegionFunction::factory());
registry.register(FlushTableFunction::factory());
registry.register(CompactTableFunction::factory());
registry.register(GcRegionsFunction::factory());
registry.register(GcTableFunction::factory());
registry.register(BuildIndexFunction::factory());
registry.register(FlushFlowFunction::factory());
registry.register(ReconcileCatalogFunction::factory());

View File

@@ -0,0 +1,220 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::time::Duration;
use common_error::ext::BoxedError;
use common_macro::admin_fn;
use common_meta::rpc::procedure::{GcRegionsRequest, GcTableRequest};
use common_query::error::{
InvalidFuncArgsSnafu, MissingProcedureServiceHandlerSnafu, Result, TableMutationSnafu,
UnsupportedInputDataTypeSnafu,
};
use datafusion_expr::{Signature, TypeSignature, Volatility};
use datatypes::arrow::datatypes::DataType as ArrowDataType;
use datatypes::prelude::*;
use session::context::QueryContextRef;
use snafu::{ResultExt, ensure};
use crate::handlers::ProcedureServiceHandlerRef;
use crate::helper::cast_u64;
const DEFAULT_GC_TIMEOUT: Duration = Duration::from_secs(60);
const DEFAULT_FULL_FILE_LISTING: bool = false;
#[admin_fn(
name = GcRegionsFunction,
display_name = gc_regions,
sig_fn = gc_regions_signature,
ret = uint64
)]
pub(crate) async fn gc_regions(
procedure_service_handler: &ProcedureServiceHandlerRef,
_ctx: &QueryContextRef,
params: &[ValueRef<'_>],
) -> Result<Value> {
let (region_ids, full_file_listing) = parse_gc_regions_params(params)?;
let resp = procedure_service_handler
.gc_regions(GcRegionsRequest {
region_ids,
full_file_listing,
timeout: DEFAULT_GC_TIMEOUT,
})
.await?;
Ok(Value::from(resp.processed_regions))
}
#[admin_fn(
name = GcTableFunction,
display_name = gc_table,
sig_fn = gc_table_signature,
ret = uint64
)]
pub(crate) async fn gc_table(
procedure_service_handler: &ProcedureServiceHandlerRef,
query_ctx: &QueryContextRef,
params: &[ValueRef<'_>],
) -> Result<Value> {
let (catalog_name, schema_name, table_name, full_file_listing) =
parse_gc_table_params(params, query_ctx)?;
let resp = procedure_service_handler
.gc_table(GcTableRequest {
catalog_name,
schema_name,
table_name,
full_file_listing,
timeout: DEFAULT_GC_TIMEOUT,
})
.await?;
Ok(Value::from(resp.processed_regions))
}
fn parse_gc_regions_params(params: &[ValueRef<'_>]) -> Result<(Vec<u64>, bool)> {
ensure!(
!params.is_empty(),
InvalidFuncArgsSnafu {
err_msg: "The length of the args is not correct, expect at least 1 region id, have 0"
.to_string(),
}
);
let (full_file_listing, region_params) = match params.last() {
Some(ValueRef::Boolean(value)) => (*value, &params[..params.len() - 1]),
_ => (DEFAULT_FULL_FILE_LISTING, params),
};
ensure!(
!region_params.is_empty(),
InvalidFuncArgsSnafu {
err_msg: "The length of the args is not correct, expect at least 1 region id"
.to_string(),
}
);
let mut region_ids = Vec::with_capacity(region_params.len());
for param in region_params {
let Some(region_id) = cast_u64(param)? else {
return UnsupportedInputDataTypeSnafu {
function: "gc_regions",
datatypes: params.iter().map(|v| v.data_type()).collect::<Vec<_>>(),
}
.fail();
};
region_ids.push(region_id);
}
Ok((region_ids, full_file_listing))
}
fn parse_gc_table_params(
params: &[ValueRef<'_>],
query_ctx: &QueryContextRef,
) -> Result<(String, String, String, bool)> {
ensure!(
matches!(params.len(), 1 | 2),
InvalidFuncArgsSnafu {
err_msg: format!(
"The length of the args is not correct, expect 1 or 2, have: {}",
params.len()
),
}
);
let ValueRef::String(table_name) = params[0] else {
return UnsupportedInputDataTypeSnafu {
function: "gc_table",
datatypes: params.iter().map(|v| v.data_type()).collect::<Vec<_>>(),
}
.fail();
};
let full_file_listing = if params.len() == 2 {
let ValueRef::Boolean(value) = params[1] else {
return UnsupportedInputDataTypeSnafu {
function: "gc_table",
datatypes: params.iter().map(|v| v.data_type()).collect::<Vec<_>>(),
}
.fail();
};
value
} else {
DEFAULT_FULL_FILE_LISTING
};
let (catalog_name, schema_name, table_name) =
session::table_name::table_name_to_full_name(table_name, query_ctx)
.map_err(BoxedError::new)
.context(TableMutationSnafu)?;
Ok((catalog_name, schema_name, table_name, full_file_listing))
}
fn gc_regions_signature() -> Signature {
Signature::variadic_any(Volatility::Immutable)
}
fn gc_table_signature() -> Signature {
Signature::one_of(
vec![
TypeSignature::Uniform(1, vec![ArrowDataType::Utf8]),
TypeSignature::Exact(vec![ArrowDataType::Utf8, ArrowDataType::Boolean]),
],
Volatility::Immutable,
)
}
#[cfg(test)]
mod tests {
use session::context::QueryContext;
use super::*;
#[test]
fn test_parse_gc_regions_params_with_full_file_listing() {
let params = vec![
ValueRef::UInt64(1),
ValueRef::UInt64(2),
ValueRef::Boolean(true),
];
let (region_ids, full_file_listing) = parse_gc_regions_params(&params).unwrap();
assert_eq!(region_ids, vec![1, 2]);
assert!(full_file_listing);
}
#[test]
fn test_parse_gc_regions_params_default_full_file_listing() {
let params = vec![ValueRef::UInt64(1), ValueRef::UInt32(2)];
let (region_ids, full_file_listing) = parse_gc_regions_params(&params).unwrap();
assert_eq!(region_ids, vec![1, 2]);
assert!(!full_file_listing);
}
#[test]
fn test_parse_gc_table_params_with_full_file_listing() {
let params = vec![ValueRef::String("public.t"), ValueRef::Boolean(true)];
let (catalog, schema, table, full_file_listing) =
parse_gc_table_params(&params, &QueryContext::arc()).unwrap();
assert_eq!(catalog, "greptime");
assert_eq!(schema, "public");
assert_eq!(table, "t");
assert!(full_file_listing);
}
}

View File

@@ -19,7 +19,9 @@ use async_trait::async_trait;
use catalog::CatalogManagerRef;
use common_base::AffectedRows;
use common_meta::rpc::procedure::{
ManageRegionFollowerRequest, MigrateRegionRequest, ProcedureStateResponse,
GcRegionsRequest as MetaGcRegionsRequest, GcResponse as MetaGcResponse,
GcTableRequest as MetaGcTableRequest, ManageRegionFollowerRequest, MigrateRegionRequest,
ProcedureStateResponse,
};
use common_query::Output;
use common_query::error::Result;
@@ -85,6 +87,12 @@ pub trait ProcedureServiceHandler: Send + Sync {
/// Get the catalog manager
fn catalog_manager(&self) -> &CatalogManagerRef;
/// Manually trigger GC for specific regions.
async fn gc_regions(&self, request: MetaGcRegionsRequest) -> Result<MetaGcResponse>;
/// Manually trigger GC for a table.
async fn gc_table(&self, request: MetaGcTableRequest) -> Result<MetaGcResponse>;
}
/// This flow service handler is only use for flush flow for now.

View File

@@ -37,7 +37,8 @@ impl FunctionState {
use catalog::CatalogManagerRef;
use common_base::AffectedRows;
use common_meta::rpc::procedure::{
ManageRegionFollowerRequest, MigrateRegionRequest, ProcedureStateResponse,
GcRegionsRequest, GcResponse, GcTableRequest, ManageRegionFollowerRequest,
MigrateRegionRequest, ProcedureStateResponse,
};
use common_query::Output;
use common_query::error::Result;
@@ -82,6 +83,24 @@ impl FunctionState {
Ok(())
}
async fn gc_regions(&self, _request: GcRegionsRequest) -> Result<GcResponse> {
Ok(GcResponse {
processed_regions: 1,
need_retry_regions: vec![],
deleted_files: 0,
deleted_indexes: 0,
})
}
async fn gc_table(&self, _request: GcTableRequest) -> Result<GcResponse> {
Ok(GcResponse {
processed_regions: 1,
need_retry_regions: vec![],
deleted_files: 0,
deleted_indexes: 0,
})
}
fn catalog_manager(&self) -> &CatalogManagerRef {
unimplemented!()
}

View File

@@ -25,8 +25,8 @@ use crate::error::{
};
use crate::rpc::ddl::{SubmitDdlTaskRequest, SubmitDdlTaskResponse};
use crate::rpc::procedure::{
self, ManageRegionFollowerRequest, MigrateRegionRequest, MigrateRegionResponse,
ProcedureStateResponse,
self, GcRegionsRequest, GcResponse, GcTableRequest, ManageRegionFollowerRequest,
MigrateRegionRequest, MigrateRegionResponse, ProcedureStateResponse,
};
/// The context of procedure executor.
@@ -78,6 +78,30 @@ pub trait ProcedureExecutor: Send + Sync {
pid: &str,
) -> Result<ProcedureStateResponse>;
/// Manually trigger GC for the specified regions.
async fn gc_regions(
&self,
_ctx: &ExecutorContext,
_request: GcRegionsRequest,
) -> Result<GcResponse> {
UnsupportedSnafu {
operation: "gc_regions",
}
.fail()
}
/// Manually trigger GC for the specified table.
async fn gc_table(
&self,
_ctx: &ExecutorContext,
_request: GcTableRequest,
) -> Result<GcResponse> {
UnsupportedSnafu {
operation: "gc_table",
}
.fail()
}
async fn list_procedures(&self, ctx: &ExecutorContext) -> Result<ProcedureDetailResponse>;
}

View File

@@ -78,6 +78,30 @@ pub struct RemoveRegionFollowerRequest {
pub peer_id: u64,
}
#[derive(Debug, Clone)]
pub struct GcRegionsRequest {
pub region_ids: Vec<u64>,
pub full_file_listing: bool,
pub timeout: Duration,
}
#[derive(Debug, Clone)]
pub struct GcTableRequest {
pub catalog_name: String,
pub schema_name: String,
pub table_name: String,
pub full_file_listing: bool,
pub timeout: Duration,
}
#[derive(Debug, Clone, Default, PartialEq, Eq)]
pub struct GcResponse {
pub processed_regions: u64,
pub need_retry_regions: Vec<u64>,
pub deleted_files: u64,
pub deleted_indexes: u64,
}
/// Cast the protobuf [`ProcedureId`] to common [`ProcedureId`].
pub fn pb_pid_to_pid(pid: &PbProcedureId) -> Result<ProcedureId> {
ProcedureId::parse_str(&String::from_utf8_lossy(&pid.key)).with_context(|_| {

View File

@@ -105,13 +105,9 @@ impl InstructionHandler for GcRegionsHandler {
// Merge reports
let mut merged_report = GcReport::default();
for report in reports {
merged_report
.deleted_files
.extend(report.deleted_files.into_iter());
merged_report
.deleted_indexes
.extend(report.deleted_indexes.into_iter());
merged_report.merge(report);
}
Ok(merged_report)
}
.instrument(common_telemetry::tracing::info_span!("gc_worker_run")),

View File

@@ -48,9 +48,9 @@ use common_meta::range_stream::PaginationStream;
use common_meta::rpc::KeyValue;
use common_meta::rpc::ddl::{SubmitDdlTaskRequest, SubmitDdlTaskResponse};
use common_meta::rpc::procedure::{
AddRegionFollowerRequest, AddTableFollowerRequest, ManageRegionFollowerRequest,
MigrateRegionRequest, MigrateRegionResponse, ProcedureStateResponse,
RemoveRegionFollowerRequest, RemoveTableFollowerRequest,
AddRegionFollowerRequest, AddTableFollowerRequest, GcRegionsRequest, GcResponse,
GcTableRequest, ManageRegionFollowerRequest, MigrateRegionRequest, MigrateRegionResponse,
ProcedureStateResponse, RemoveRegionFollowerRequest, RemoveTableFollowerRequest,
};
use common_meta::rpc::store::{
BatchDeleteRequest, BatchDeleteResponse, BatchGetRequest, BatchGetResponse, BatchPutRequest,
@@ -387,6 +387,28 @@ impl ProcedureExecutor for MetaClient {
.context(meta_error::ExternalSnafu)
}
async fn gc_regions(
&self,
_ctx: &ExecutorContext,
request: GcRegionsRequest,
) -> MetaResult<GcResponse> {
self.gc_regions(request)
.await
.map_err(BoxedError::new)
.context(meta_error::ExternalSnafu)
}
async fn gc_table(
&self,
_ctx: &ExecutorContext,
request: GcTableRequest,
) -> MetaResult<GcResponse> {
self.gc_table(request)
.await
.map_err(BoxedError::new)
.context(meta_error::ExternalSnafu)
}
async fn list_procedures(&self, _ctx: &ExecutorContext) -> MetaResult<ProcedureDetailResponse> {
self.procedure_client()
.map_err(BoxedError::new)
@@ -715,6 +737,16 @@ impl MetaClient {
self.procedure_client()?.reconcile(request).await
}
/// Manually trigger GC for specific regions.
pub async fn gc_regions(&self, request: GcRegionsRequest) -> Result<GcResponse> {
self.procedure_client()?.gc_regions(request).await
}
/// Manually trigger GC for a table (all its regions).
pub async fn gc_table(&self, request: GcTableRequest) -> Result<GcResponse> {
self.procedure_client()?.gc_table(request).await
}
/// Submit a DDL task
pub async fn submit_ddl_task(
&self,

View File

@@ -18,11 +18,16 @@ use std::time::Duration;
use api::v1::meta::procedure_service_client::ProcedureServiceClient;
use api::v1::meta::{
DdlTaskRequest, DdlTaskResponse, MigrateRegionRequest, MigrateRegionResponse,
ProcedureDetailRequest, ProcedureDetailResponse, ProcedureId, ProcedureStateResponse,
QueryProcedureRequest, ReconcileRequest, ReconcileResponse, ResponseHeader, Role,
DdlTaskRequest, DdlTaskResponse, GcRegionsRequest, GcRegionsResponse, GcTableRequest,
GcTableResponse, MigrateRegionRequest, MigrateRegionResponse, ProcedureDetailRequest,
ProcedureDetailResponse, ProcedureId, ProcedureStateResponse, QueryProcedureRequest,
ReconcileRequest, ReconcileResponse, RequestHeader, ResponseHeader, Role,
};
use common_grpc::channel_manager::ChannelManager;
use common_meta::rpc::procedure::{
GcRegionsRequest as MetaGcRegionsRequest, GcResponse as MetaGcResponse,
GcTableRequest as MetaGcTableRequest,
};
use common_telemetry::tracing_context::TracingContext;
use common_telemetry::{error, info, warn};
use snafu::{ResultExt, ensure};
@@ -105,6 +110,16 @@ impl Client {
let inner = self.inner.read().await;
inner.list_procedures().await
}
pub async fn gc_regions(&self, request: MetaGcRegionsRequest) -> Result<MetaGcResponse> {
let inner = self.inner.read().await;
inner.gc_regions(request).await
}
pub async fn gc_table(&self, request: MetaGcTableRequest) -> Result<MetaGcResponse> {
let inner = self.inner.read().await;
inner.gc_table(request).await
}
}
#[derive(Debug)]
@@ -260,6 +275,78 @@ impl Inner {
.await
}
async fn gc_regions(&self, request: MetaGcRegionsRequest) -> Result<MetaGcResponse> {
let timeout = request.timeout;
let req = GcRegionsRequest {
header: Some(RequestHeader {
protocol_version: 0,
member_id: self.id,
role: self.role as i32,
tracing_context: TracingContext::from_current_span().to_w3c(),
}),
region_ids: request.region_ids,
full_file_listing: request.full_file_listing,
timeout_secs: timeout.as_secs() as u32,
};
let resp: GcRegionsResponse = self
.with_retry(
"gc_regions",
move |mut client| {
let mut req = Request::new(req.clone());
req.set_timeout(timeout);
async move { client.gc_regions(req).await.map(|res| res.into_inner()) }
},
|resp: &GcRegionsResponse| &resp.header,
)
.await?;
let stats = resp.stats.unwrap_or_default();
Ok(MetaGcResponse {
processed_regions: stats.processed_regions,
need_retry_regions: stats.need_retry_regions,
deleted_files: stats.deleted_files,
deleted_indexes: stats.deleted_indexes,
})
}
async fn gc_table(&self, request: MetaGcTableRequest) -> Result<MetaGcResponse> {
let timeout = request.timeout;
let req = GcTableRequest {
header: Some(RequestHeader {
protocol_version: 0,
member_id: self.id,
role: self.role as i32,
tracing_context: TracingContext::from_current_span().to_w3c(),
}),
catalog_name: request.catalog_name,
schema_name: request.schema_name,
table_name: request.table_name,
full_file_listing: request.full_file_listing,
timeout_secs: timeout.as_secs() as u32,
};
let resp: GcTableResponse = self
.with_retry(
"gc_table",
move |mut client| {
let mut req = Request::new(req.clone());
req.set_timeout(timeout);
async move { client.gc_table(req).await.map(|res| res.into_inner()) }
},
|resp: &GcTableResponse| &resp.header,
)
.await?;
let stats = resp.stats.unwrap_or_default();
Ok(MetaGcResponse {
processed_regions: stats.processed_regions,
need_retry_regions: stats.need_retry_regions,
deleted_files: stats.deleted_files,
deleted_indexes: stats.deleted_indexes,
})
}
async fn query_procedure_state(&self, pid: &str) -> Result<ProcedureStateResponse> {
let mut req = QueryProcedureRequest {
pid: Some(ProcedureId { key: pid.into() }),
@@ -333,10 +420,11 @@ mod tests {
use api::v1::meta::heartbeat_server::{Heartbeat, HeartbeatServer};
use api::v1::meta::procedure_service_server::{ProcedureService, ProcedureServiceServer};
use api::v1::meta::{
AskLeaderRequest, AskLeaderResponse, DdlTaskRequest, DdlTaskResponse, HeartbeatRequest,
HeartbeatResponse, MigrateRegionRequest, MigrateRegionResponse, Peer,
ProcedureDetailRequest, ProcedureDetailResponse, ProcedureStateResponse,
QueryProcedureRequest, ReconcileRequest, ReconcileResponse, ResponseHeader, Role,
AskLeaderRequest, AskLeaderResponse, DdlTaskRequest, DdlTaskResponse, GcRegionsRequest,
GcRegionsResponse, GcTableRequest, GcTableResponse, HeartbeatRequest, HeartbeatResponse,
MigrateRegionRequest, MigrateRegionResponse, Peer, ProcedureDetailRequest,
ProcedureDetailResponse, ProcedureStateResponse, QueryProcedureRequest, ReconcileRequest,
ReconcileResponse, ResponseHeader, Role,
};
use async_trait::async_trait;
use common_error::status_code::StatusCode;
@@ -435,6 +523,20 @@ mod tests {
) -> Result<Response<ProcedureDetailResponse>, Status> {
Err(Status::unimplemented("details is not used in this test"))
}
async fn gc_regions(
&self,
_request: Request<GcRegionsRequest>,
) -> Result<Response<GcRegionsResponse>, Status> {
Err(Status::unimplemented("gc_regions is not used in this test"))
}
async fn gc_table(
&self,
_request: Request<GcTableRequest>,
) -> Result<Response<GcTableResponse>, Status> {
Err(Status::unimplemented("gc_table is not used in this test"))
}
}
#[tokio::test(flavor = "multi_thread")]

View File

@@ -31,7 +31,7 @@ mod util;
pub use options::GcSchedulerOptions;
pub use procedure::BatchGcProcedure;
pub use scheduler::{Event, GcScheduler, GcTickerRef};
pub use scheduler::{Event, GcJobReport, GcScheduler, GcTickerRef};
/// Mapping from region ID to its associated peers (leader and followers).
pub type Region2Peers = HashMap<RegionId, (Peer, Vec<Peer>)>;

View File

@@ -88,6 +88,16 @@ impl<'a> DroppedRegionCollector<'a> {
pub async fn collect_and_assign(
&self,
table_reparts: &[(TableId, TableRepartValue)],
) -> Result<DroppedRegionAssignment> {
self.collect_and_assign_with_cooldown(table_reparts, true)
.await
}
/// Collect and assign dropped regions for GC with optional cooldown filtering.
pub async fn collect_and_assign_with_cooldown(
&self,
table_reparts: &[(TableId, TableRepartValue)],
apply_cooldown: bool,
) -> Result<DroppedRegionAssignment> {
// get active region ids for all tables involved in repartitioning
let active_region_ids: HashSet<RegionId> = {
@@ -113,7 +123,11 @@ impl<'a> DroppedRegionCollector<'a> {
return Ok(DroppedRegionAssignment::default());
}
let dropped_regions = self.filter_by_cooldown(dropped_regions).await;
let dropped_regions = if apply_cooldown {
self.filter_by_cooldown(dropped_regions).await
} else {
dropped_regions
};
if dropped_regions.is_empty() {
return Ok(DroppedRegionAssignment::default());

View File

@@ -114,12 +114,27 @@ impl GcScheduler {
.await;
let duration = start_time.elapsed();
info!(
"Finished GC cycle. Processed {} datanodes ({} failed). Duration: {:?}",
report.per_datanode_reports.len(),
report.failed_datanodes.len(),
duration
);
match &report {
GcJobReport::PerDatanode {
per_datanode_reports,
failed_datanodes,
} => {
info!(
"Finished GC cycle. Processed {} datanodes ({} failed). Duration: {:?}",
per_datanode_reports.len(),
failed_datanodes.len(),
duration
);
}
GcJobReport::Combined { report } => {
info!(
"Finished GC cycle with combined report. Deleted files: {}, deleted indexes: {}. Duration: {:?}",
report.deleted_files.len(),
report.deleted_indexes.len(),
duration
);
}
}
debug!("Detailed GC Job Report: {report:#?}");
Ok(report)
@@ -196,7 +211,8 @@ impl GcScheduler {
force_full_listing_by_peer: HashMap<Peer, HashSet<RegionId>>,
region_routes_override_by_peer: HashMap<Peer, Region2Peers>,
) -> GcJobReport {
let mut report = GcJobReport::default();
let mut per_datanode_reports = HashMap::new();
let mut failed_datanodes: HashMap<_, Vec<_>> = HashMap::new();
// Create a stream of datanode GC tasks with limited concurrency
let results: Vec<_> = futures::stream::iter(
@@ -237,18 +253,21 @@ impl GcScheduler {
for (peer, result) in results {
match result {
Ok(dn_report) => {
report.per_datanode_reports.insert(peer.id, dn_report);
per_datanode_reports.insert(peer.id, dn_report);
}
Err(e) => {
error!(e; "Failed to process datanode GC for peer {}", peer);
// Note: We don't have a direct way to map peer to table_id here,
// so we just log the error. The table_reports will contain individual region failures.
report.failed_datanodes.entry(peer.id).or_default().push(e);
failed_datanodes.entry(peer.id).or_default().push(e);
}
}
}
report
GcJobReport::PerDatanode {
per_datanode_reports,
failed_datanodes,
}
}
/// Process GC for a single datanode with all its candidate regions.

View File

@@ -50,13 +50,16 @@ pub const TEST_REGION_SIZE_200MB: u64 = 200_000_000;
/// Helper function to create an empty GcReport for the given region IDs
pub fn new_empty_report_with(region_ids: impl IntoIterator<Item = RegionId>) -> GcReport {
let mut deleted_files = HashMap::new();
let mut processed_regions = HashSet::new();
for region_id in region_ids {
deleted_files.insert(region_id, vec![]);
processed_regions.insert(region_id);
}
GcReport {
deleted_files,
deleted_indexes: HashMap::new(),
need_retry_regions: HashSet::new(),
processed_regions,
}
}

View File

@@ -34,8 +34,18 @@ async fn test_parallel_process_datanodes_empty() {
.parallel_process_datanodes(HashMap::new(), HashMap::new(), HashMap::new())
.await;
assert_eq!(report.per_datanode_reports.len(), 0);
assert_eq!(report.failed_datanodes.len(), 0);
match report {
crate::gc::scheduler::GcJobReport::PerDatanode {
per_datanode_reports,
failed_datanodes,
} => {
assert_eq!(per_datanode_reports.len(), 0);
assert_eq!(failed_datanodes.len(), 0);
}
crate::gc::scheduler::GcJobReport::Combined { .. } => {
panic!("expected per-datanode report");
}
}
}
#[tokio::test]
@@ -88,8 +98,18 @@ async fn test_parallel_process_datanodes_with_candidates() {
.parallel_process_datanodes(datanode_to_candidates, HashMap::new(), HashMap::new())
.await;
assert_eq!(report.per_datanode_reports.len(), 1);
assert_eq!(report.failed_datanodes.len(), 0);
match report {
crate::gc::scheduler::GcJobReport::PerDatanode {
per_datanode_reports,
failed_datanodes,
} => {
assert_eq!(per_datanode_reports.len(), 1);
assert_eq!(failed_datanodes.len(), 0);
}
crate::gc::scheduler::GcJobReport::Combined { .. } => {
panic!("expected per-datanode report");
}
}
}
#[tokio::test]
@@ -140,16 +160,18 @@ async fn test_handle_tick() {
let report = scheduler.handle_tick().await.unwrap();
// Validate the returned GcJobReport
assert_eq!(
report.per_datanode_reports.len(),
1,
"Should process 1 datanode"
);
assert_eq!(
report.failed_datanodes.len(),
0,
"Should have 0 failed datanodes"
);
match report {
crate::gc::scheduler::GcJobReport::PerDatanode {
per_datanode_reports,
failed_datanodes,
} => {
assert_eq!(per_datanode_reports.len(), 1, "Should process 1 datanode");
assert_eq!(failed_datanodes.len(), 0, "Should have 0 failed datanodes");
}
crate::gc::scheduler::GcJobReport::Combined { .. } => {
panic!("expected per-datanode report");
}
}
assert_eq!(*ctx.get_table_to_region_stats_calls.lock().unwrap(), 1);
assert_eq!(*ctx.gc_regions_calls.lock().unwrap(), 1);

View File

@@ -100,8 +100,18 @@ async fn test_concurrent_table_processing_limits() {
.await;
// Should process all datanodes
assert_eq!(report.per_datanode_reports.len(), 1);
assert_eq!(report.failed_datanodes.len(), 0);
match report {
crate::gc::scheduler::GcJobReport::PerDatanode {
per_datanode_reports,
failed_datanodes,
} => {
assert_eq!(per_datanode_reports.len(), 1);
assert_eq!(failed_datanodes.len(), 0);
}
crate::gc::scheduler::GcJobReport::Combined { .. } => {
panic!("expected per-datanode report");
}
}
}
#[tokio::test]
@@ -172,11 +182,21 @@ async fn test_datanode_processes_tables_with_partial_gc_failures() {
.await;
// Should have one datanode with mixed results
assert_eq!(report.per_datanode_reports.len(), 1);
let datanode_report = match &report {
crate::gc::scheduler::GcJobReport::PerDatanode {
per_datanode_reports,
failed_datanodes,
} => {
assert_eq!(per_datanode_reports.len(), 1);
assert_eq!(failed_datanodes.len(), 0);
per_datanode_reports.values().next().unwrap()
}
crate::gc::scheduler::GcJobReport::Combined { .. } => {
panic!("expected per-datanode report");
}
};
// also check one failed region (region2 has no GC report, so it should be in need_retry_regions)
let datanode_report = report.per_datanode_reports.values().next().unwrap();
assert_eq!(datanode_report.need_retry_regions.len(), 1);
assert_eq!(report.failed_datanodes.len(), 0);
}
// Region Concurrency Tests
@@ -376,7 +396,15 @@ async fn test_region_gc_concurrency_with_partial_failures() {
.parallel_process_datanodes(datanode_to_candidates, HashMap::new(), HashMap::new())
.await;
let report = report.per_datanode_reports.get(&peer.id).unwrap();
let report = match &report {
crate::gc::scheduler::GcJobReport::PerDatanode {
per_datanode_reports,
..
} => per_datanode_reports.get(&peer.id).unwrap(),
crate::gc::scheduler::GcJobReport::Combined { .. } => {
panic!("expected per-datanode report");
}
};
// Should have 3 successful and 3 failed regions
// Even regions (2, 4, 6) should succeed, odd regions (1, 3, 5) should fail
@@ -506,7 +534,15 @@ async fn test_region_gc_concurrency_with_retryable_errors() {
.parallel_process_datanodes(datanode_to_candidates, HashMap::new(), HashMap::new())
.await;
let report = report.per_datanode_reports.get(&peer.id).unwrap();
let report = match &report {
crate::gc::scheduler::GcJobReport::PerDatanode {
per_datanode_reports,
..
} => per_datanode_reports.get(&peer.id).unwrap(),
crate::gc::scheduler::GcJobReport::Combined { .. } => {
panic!("expected per-datanode report");
}
};
// In the new implementation without retry logic, all regions should be processed
// The exact behavior depends on how the mock handles the regions

View File

@@ -93,19 +93,30 @@ async fn test_gc_regions_failure_handling() {
let report = scheduler.handle_tick().await.unwrap();
// Validate the report shows the failure handling
assert_eq!(
report.per_datanode_reports.len(),
1,
"Should process 1 datanode despite failure"
);
assert_eq!(
report.failed_datanodes.len(),
0,
"Should have 0 failed datanodes (failure handled via need_retry_regions)"
);
// Check the report shows the failure handling
let datanode_report = match &report {
crate::gc::scheduler::GcJobReport::PerDatanode {
per_datanode_reports,
failed_datanodes,
} => {
assert_eq!(
per_datanode_reports.len(),
1,
"Should process 1 datanode despite failure"
);
assert_eq!(
failed_datanodes.len(),
0,
"Should have 0 failed datanodes (failure handled via need_retry_regions)"
);
per_datanode_reports.values().next().unwrap()
}
crate::gc::scheduler::GcJobReport::Combined { .. } => {
panic!("expected per-datanode report");
}
};
// Check that the region is in need_retry_regions due to the failure
let datanode_report = report.per_datanode_reports.values().next().unwrap();
assert_eq!(
datanode_report.need_retry_regions.len(),
1,
@@ -180,19 +191,25 @@ async fn test_get_file_references_failure() {
// Validate the report shows the expected results
// In the new implementation, even if get_file_references fails, we still create a datanode report
assert_eq!(
report.per_datanode_reports.len(),
1,
"Should process 1 datanode"
);
assert_eq!(
report.failed_datanodes.len(),
0,
"Should have 0 failed datanodes (failure handled gracefully)"
);
let datanode_report = match &report {
crate::gc::scheduler::GcJobReport::PerDatanode {
per_datanode_reports,
failed_datanodes,
} => {
assert_eq!(per_datanode_reports.len(), 1, "Should process 1 datanode");
assert_eq!(
failed_datanodes.len(),
0,
"Should have 0 failed datanodes (failure handled gracefully)"
);
per_datanode_reports.values().next().unwrap()
}
crate::gc::scheduler::GcJobReport::Combined { .. } => {
panic!("expected per-datanode report");
}
};
// The region should be processed but may have empty results due to file refs failure
let datanode_report = report.per_datanode_reports.values().next().unwrap();
// The current implementation still processes the region even with file refs failure
// and creates an empty entry in deleted_files
assert!(

View File

@@ -86,19 +86,19 @@ async fn test_full_gc_workflow() {
let report = scheduler.handle_tick().await.unwrap();
// Validate the returned GcJobReport - should have 1 datanode report
assert_eq!(
report.per_datanode_reports.len(),
1,
"Should process 1 datanode"
);
assert_eq!(
report.failed_datanodes.len(),
0,
"Should have no failed datanodes"
);
// Get the datanode report
let datanode_report = report.per_datanode_reports.values().next().unwrap();
let datanode_report = match &report {
crate::gc::scheduler::GcJobReport::PerDatanode {
per_datanode_reports,
failed_datanodes,
} => {
assert_eq!(per_datanode_reports.len(), 1, "Should process 1 datanode");
assert_eq!(failed_datanodes.len(), 0, "Should have no failed datanodes");
per_datanode_reports.values().next().unwrap()
}
crate::gc::scheduler::GcJobReport::Combined { .. } => {
panic!("expected per-datanode report");
}
};
// Check that the region was processed successfully
assert!(
@@ -216,19 +216,19 @@ async fn test_tracker_cleanup() {
let report = scheduler.handle_tick().await.unwrap();
// Validate the returned GcJobReport - should have 1 datanode report
assert_eq!(
report.per_datanode_reports.len(),
1,
"Should process 1 datanode"
);
assert_eq!(
report.failed_datanodes.len(),
0,
"Should have no failed datanodes"
);
// Get the datanode report
let datanode_report = report.per_datanode_reports.values().next().unwrap();
let datanode_report = match &report {
crate::gc::scheduler::GcJobReport::PerDatanode {
per_datanode_reports,
failed_datanodes,
} => {
assert_eq!(per_datanode_reports.len(), 1, "Should process 1 datanode");
assert_eq!(failed_datanodes.len(), 0, "Should have no failed datanodes");
per_datanode_reports.values().next().unwrap()
}
crate::gc::scheduler::GcJobReport::Combined { .. } => {
panic!("expected per-datanode report");
}
};
// Check that the region was processed successfully
assert!(

View File

@@ -73,8 +73,18 @@ async fn test_empty_file_refs_manifest() {
.parallel_process_datanodes(datanode_to_candidates, HashMap::new(), HashMap::new())
.await;
assert_eq!(report.per_datanode_reports.len(), 1);
assert_eq!(report.failed_datanodes.len(), 0);
match report {
crate::gc::scheduler::GcJobReport::PerDatanode {
per_datanode_reports,
failed_datanodes,
} => {
assert_eq!(per_datanode_reports.len(), 1);
assert_eq!(failed_datanodes.len(), 0);
}
crate::gc::scheduler::GcJobReport::Combined { .. } => {
panic!("expected per-datanode report");
}
}
// Should handle empty file refs gracefully
}
@@ -150,6 +160,16 @@ async fn test_multiple_regions_per_table() {
.parallel_process_datanodes(datanode_to_candidates, HashMap::new(), HashMap::new())
.await;
assert_eq!(report.per_datanode_reports.len(), 1);
assert_eq!(report.failed_datanodes.len(), 0);
match report {
crate::gc::scheduler::GcJobReport::PerDatanode {
per_datanode_reports,
failed_datanodes,
} => {
assert_eq!(per_datanode_reports.len(), 1);
assert_eq!(failed_datanodes.len(), 0);
}
crate::gc::scheduler::GcJobReport::Combined { .. } => {
panic!("expected per-datanode report");
}
}
}

View File

@@ -12,23 +12,25 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashMap;
use std::collections::{HashMap, HashSet};
use std::sync::Arc;
use std::time::Instant;
use std::time::{Duration, Instant};
use common_meta::DatanodeId;
use common_meta::key::TableMetadataManagerRef;
use common_procedure::ProcedureManagerRef;
use common_telemetry::tracing::Instrument as _;
use common_telemetry::{error, info};
use store_api::storage::GcReport;
use store_api::storage::{GcReport, RegionId};
use tokio::sync::mpsc::{Receiver, Sender};
use tokio::sync::{Mutex, oneshot};
use crate::cluster::MetaPeerClientRef;
use crate::define_ticker;
use crate::error::{Error, Result};
use crate::gc::Region2Peers;
use crate::gc::ctx::{DefaultGcSchedulerCtx, SchedulerCtx};
use crate::gc::dropped::DroppedRegionCollector;
use crate::gc::options::{GcSchedulerOptions, TICKER_INTERVAL};
use crate::gc::tracker::RegionGcTracker;
use crate::metrics::{
@@ -37,10 +39,46 @@ use crate::metrics::{
use crate::service::mailbox::MailboxRef;
/// Report for a GC job.
#[derive(Debug, Default)]
pub struct GcJobReport {
pub per_datanode_reports: HashMap<DatanodeId, GcReport>,
pub failed_datanodes: HashMap<DatanodeId, Vec<Error>>,
#[derive(Debug)]
pub enum GcJobReport {
PerDatanode {
per_datanode_reports: HashMap<DatanodeId, GcReport>,
failed_datanodes: HashMap<DatanodeId, Vec<Error>>,
},
Combined {
report: GcReport,
},
}
impl Default for GcJobReport {
fn default() -> Self {
Self::PerDatanode {
per_datanode_reports: HashMap::new(),
failed_datanodes: HashMap::new(),
}
}
}
impl GcJobReport {
pub fn combined(report: GcReport) -> Self {
Self::Combined { report }
}
pub fn merge_to_report(self) -> GcReport {
match self {
GcJobReport::Combined { report } => report,
GcJobReport::PerDatanode {
per_datanode_reports,
..
} => {
let mut combined = GcReport::default();
for (_datanode_id, report) in per_datanode_reports {
combined.merge(report);
}
combined
}
}
}
}
/// [`Event`] represents various types of events that can be processed by the gc ticker.
@@ -48,10 +86,20 @@ pub struct GcJobReport {
/// Variants:
/// - `Tick`: This event is used to trigger gc periodically.
/// - `Manually`: This event is used to trigger a manual gc run and provides a channel
/// to send back the [`GcJobReport`] for that run.
/// to send back the result for that run.
/// Optional parameters allow specifying target regions and GC behavior.
pub enum Event {
Tick,
Manually(oneshot::Sender<GcJobReport>),
Manually {
/// Channel sender to return the GC job report or error
sender: oneshot::Sender<Result<GcJobReport>>,
/// Optional specific region IDs to GC. If None, scheduler will select candidates automatically.
region_ids: Option<Vec<RegionId>>,
/// Optional override for full file listing. If None, uses scheduler config.
full_file_listing: Option<bool>,
/// Optional override for timeout. If None, uses scheduler config.
timeout: Option<Duration>,
},
}
#[allow(unused)]
@@ -130,19 +178,23 @@ impl GcScheduler {
error!(e; "Failed to handle gc tick");
}
}
Event::Manually(sender) => {
Event::Manually {
sender,
region_ids,
full_file_listing,
timeout,
} => {
info!("Received manually gc request");
let span =
common_telemetry::tracing::info_span!("meta_gc_tick", trigger = "manual");
match self.handle_tick().instrument(span).await {
Ok(report) => {
// ignore error
let _ = sender.send(report);
}
Err(e) => {
error!(e; "Failed to handle gc tick");
}
};
let result = self
.handle_manual_gc(region_ids, full_file_listing, timeout)
.instrument(span)
.await;
if let Err(e) = &result {
error!(e; "Failed to handle manual gc");
}
let _ = sender.send(result);
}
}
}
@@ -162,4 +214,162 @@ impl GcScheduler {
Ok(report)
}
/// Handles a manual GC request with optional specific parameters.
///
/// If `region_ids` is specified, GC will be performed only on those regions.
/// Otherwise, falls back to automatic candidate selection.
pub(crate) async fn handle_manual_gc(
&self,
region_ids: Option<Vec<RegionId>>,
full_file_listing: Option<bool>,
timeout: Option<Duration>,
) -> Result<GcJobReport> {
info!("Start to handle manual gc request");
// No specific regions, use default tick behavior
let Some(regions) = region_ids else {
let report = self.trigger_gc().await?;
info!("Finished manual gc request");
return Ok(report);
};
// Empty regions list, return empty report
if regions.is_empty() {
info!("Finished manual gc request");
return Ok(GcJobReport::combined(GcReport::default()));
}
let full_listing = full_file_listing.unwrap_or(false);
let gc_timeout = timeout.unwrap_or(self.config.mailbox_timeout);
let region_set: HashSet<RegionId> = regions.iter().copied().collect();
let table_reparts = self.ctx.get_table_reparts().await?;
let dropped_collector =
DroppedRegionCollector::new(self.ctx.as_ref(), &self.config, &self.region_gc_tracker);
let dropped_assignment = dropped_collector
.collect_and_assign_with_cooldown(&table_reparts, false)
.await?;
let mut dropped_region_set = HashSet::new();
let mut dropped_routes_override = Region2Peers::new();
for overrides in dropped_assignment.region_routes_override.into_values() {
for (region_id, route) in overrides {
if region_set.contains(&region_id) {
dropped_region_set.insert(region_id);
dropped_routes_override.insert(region_id, route);
}
}
}
let (dropped_regions, active_regions): (Vec<_>, Vec<_>) = regions
.into_iter()
.partition(|region_id| dropped_region_set.contains(region_id));
let mut combined_report = GcReport::default();
if !active_regions.is_empty() {
let report = self
.ctx
.gc_regions(
&active_regions,
full_listing,
gc_timeout,
Region2Peers::new(),
)
.await?;
combined_report.merge(report);
}
if !dropped_regions.is_empty() {
let report = self
.ctx
.gc_regions(&dropped_regions, true, gc_timeout, dropped_routes_override)
.await?;
combined_report.merge(report);
}
let report = GcJobReport::combined(combined_report);
info!("Finished manual gc request");
Ok(report)
}
}
#[cfg(test)]
mod tests {
use std::collections::HashMap;
use std::time::Duration;
use common_meta::datanode::RegionStat;
use common_meta::key::table_repart::TableRepartValue;
use common_meta::key::table_route::PhysicalTableRouteValue;
use store_api::storage::RegionId;
use table::metadata::TableId;
use super::*;
struct ErrorMockSchedulerCtx;
#[async_trait::async_trait]
impl SchedulerCtx for ErrorMockSchedulerCtx {
async fn get_table_to_region_stats(&self) -> Result<HashMap<TableId, Vec<RegionStat>>> {
Ok(HashMap::new())
}
async fn get_table_reparts(&self) -> Result<Vec<(TableId, TableRepartValue)>> {
Ok(vec![])
}
async fn get_table_route(
&self,
_table_id: TableId,
) -> Result<(TableId, PhysicalTableRouteValue)> {
unreachable!("get_table_route should not be called in this test")
}
async fn batch_get_table_route(
&self,
_table_ids: &[TableId],
) -> Result<HashMap<TableId, PhysicalTableRouteValue>> {
Ok(HashMap::new())
}
async fn gc_regions(
&self,
_region_ids: &[RegionId],
_full_file_listing: bool,
_timeout: Duration,
_region_routes_override: Region2Peers,
) -> Result<GcReport> {
crate::error::UnexpectedSnafu {
violated: "mock gc failure".to_string(),
}
.fail()
}
}
#[tokio::test]
async fn test_handle_manual_gc_propagates_error() {
let (tx, rx) = GcScheduler::channel();
drop(tx);
let scheduler = GcScheduler {
ctx: Arc::new(ErrorMockSchedulerCtx),
receiver: rx,
config: GcSchedulerOptions::default(),
region_gc_tracker: Arc::new(Mutex::new(HashMap::new())),
last_tracker_cleanup: Arc::new(Mutex::new(Instant::now())),
};
let result = scheduler
.handle_manual_gc(
Some(vec![RegionId::new(1, 0)]),
Some(false),
Some(Duration::from_secs(1)),
)
.await;
assert!(result.is_err());
}
}

View File

@@ -16,24 +16,32 @@ use std::time::Duration;
use api::v1::meta::reconcile_request::Target;
use api::v1::meta::{
DdlTaskRequest as PbDdlTaskRequest, DdlTaskResponse as PbDdlTaskResponse, MigrateRegionRequest,
DdlTaskRequest as PbDdlTaskRequest, DdlTaskResponse as PbDdlTaskResponse, GcRegionsRequest,
GcRegionsResponse, GcStats, GcTableRequest, GcTableResponse, MigrateRegionRequest,
MigrateRegionResponse, ProcedureDetailRequest, ProcedureDetailResponse, ProcedureStateResponse,
QueryProcedureRequest, ReconcileCatalog, ReconcileDatabase, ReconcileRequest,
ReconcileResponse, ReconcileTable, ResolveStrategy, procedure_service_server,
};
use common_meta::key::TableMetadataManagerRef;
use common_meta::key::table_name::TableNameKey;
use common_meta::procedure_executor::ExecutorContext;
use common_meta::rpc::ddl::{DdlTask, SubmitDdlTaskRequest};
use common_meta::rpc::procedure;
use common_meta::rpc::procedure::{
self, GcRegionsRequest as MetaGcRegionsRequest, GcResponse,
GcTableRequest as MetaGcTableRequest,
};
use snafu::{OptionExt, ResultExt};
use store_api::storage::RegionId;
use table::table_reference::TableReference;
use tonic::Request;
use crate::error::{TableMetadataManagerSnafu, TableNotFoundSnafu};
use crate::metasrv::Metasrv;
use crate::procedure::region_migration::manager::{
RegionMigrationProcedureTask, RegionMigrationTriggerReason,
};
use crate::service::GrpcResult;
use crate::{check_leader, error};
use crate::{check_leader, error, gc};
#[async_trait::async_trait]
impl procedure_service_server::ProcedureService for Metasrv {
@@ -240,4 +248,210 @@ impl procedure_service_server::ProcedureService for Metasrv {
metas,
)))
}
async fn gc_regions(
&self,
request: Request<GcRegionsRequest>,
) -> GrpcResult<GcRegionsResponse> {
check_leader!(self, request, GcRegionsResponse, "`gc_regions`");
let GcRegionsRequest {
header,
region_ids,
full_file_listing,
timeout_secs,
} = request.into_inner();
let _header = header.context(error::MissingRequestHeaderSnafu)?;
let response = self
.handle_gc_regions(MetaGcRegionsRequest {
region_ids,
full_file_listing,
timeout: Duration::from_secs(timeout_secs as u64),
})
.await?;
Ok(Response::new(gc_response_to_regions_pb(response)))
}
async fn gc_table(&self, request: Request<GcTableRequest>) -> GrpcResult<GcTableResponse> {
check_leader!(self, request, GcTableResponse, "`gc_table`");
let GcTableRequest {
header,
catalog_name,
schema_name,
table_name,
full_file_listing,
timeout_secs,
} = request.into_inner();
let _header = header.context(error::MissingRequestHeaderSnafu)?;
let response = self
.handle_gc_table(MetaGcTableRequest {
catalog_name,
schema_name,
table_name,
full_file_listing,
timeout: Duration::from_secs(timeout_secs as u64),
})
.await?;
Ok(Response::new(gc_response_to_table_pb(response)))
}
}
impl Metasrv {
fn normalize_gc_timeout(timeout: Duration) -> Option<Duration> {
if timeout.is_zero() {
None
} else {
Some(timeout)
}
}
async fn handle_gc_regions(&self, request: MetaGcRegionsRequest) -> error::Result<GcResponse> {
let region_ids: Vec<RegionId> = request
.region_ids
.into_iter()
.map(RegionId::from_u64)
.collect();
self.trigger_gc_for_regions(region_ids, request.full_file_listing, request.timeout)
.await
}
async fn handle_gc_table(&self, request: MetaGcTableRequest) -> error::Result<GcResponse> {
let table_name_key = TableNameKey::new(
&request.catalog_name,
&request.schema_name,
&request.table_name,
);
let table_metadata_manager: &TableMetadataManagerRef = self.table_metadata_manager();
let table_id = table_metadata_manager
.table_name_manager()
.get(table_name_key)
.await
.context(TableMetadataManagerSnafu)?
.context(TableNotFoundSnafu {
name: request.table_name.clone(),
})?
.table_id();
let (_phy_table_id, route) = table_metadata_manager
.table_route_manager()
.get_physical_table_route(table_id)
.await
.context(TableMetadataManagerSnafu)?;
let region_ids: Vec<RegionId> = route.region_routes.iter().map(|r| r.region.id).collect();
self.trigger_gc_for_regions(region_ids, request.full_file_listing, request.timeout)
.await
}
/// Triggers manual GC for specified regions and returns the GC response.
async fn trigger_gc_for_regions(
&self,
region_ids: Vec<RegionId>,
full_file_listing: bool,
timeout: Duration,
) -> error::Result<GcResponse> {
let timeout = Self::normalize_gc_timeout(timeout);
let gc_ticker = self.gc_ticker().context(error::UnexpectedSnafu {
violated: "GC ticker not available".to_string(),
})?;
let (tx, rx) = tokio::sync::oneshot::channel();
gc_ticker
.sender
.send(gc::Event::Manually {
sender: tx,
region_ids: Some(region_ids),
full_file_listing: Some(full_file_listing),
timeout,
})
.await
.map_err(|_| {
error::UnexpectedSnafu {
violated: "Failed to send GC event".to_string(),
}
.build()
})?;
let job_report = rx.await.map_err(|_| {
error::UnexpectedSnafu {
violated: "GC job channel closed unexpectedly".to_string(),
}
.build()
})??;
let report = gc_job_report_to_gc_report(job_report);
Ok(gc_report_to_response(&report))
}
}
fn gc_job_report_to_gc_report(job_report: crate::gc::GcJobReport) -> store_api::storage::GcReport {
job_report.merge_to_report()
}
fn gc_report_to_response(report: &store_api::storage::GcReport) -> GcResponse {
let deleted_files = report.deleted_files.values().map(|v| v.len() as u64).sum();
let deleted_indexes = report
.deleted_indexes
.values()
.map(|v| v.len() as u64)
.sum();
GcResponse {
processed_regions: report.processed_regions.len() as u64,
need_retry_regions: report
.need_retry_regions
.iter()
.map(|id| id.as_u64())
.collect(),
deleted_files,
deleted_indexes,
}
}
fn gc_response_to_regions_pb(resp: GcResponse) -> GcRegionsResponse {
GcRegionsResponse {
stats: Some(GcStats {
processed_regions: resp.processed_regions,
need_retry_regions: resp.need_retry_regions,
deleted_files: resp.deleted_files,
deleted_indexes: resp.deleted_indexes,
}),
..Default::default()
}
}
fn gc_response_to_table_pb(resp: GcResponse) -> GcTableResponse {
GcTableResponse {
stats: Some(GcStats {
processed_regions: resp.processed_regions,
need_retry_regions: resp.need_retry_regions,
deleted_files: resp.deleted_files,
deleted_indexes: resp.deleted_indexes,
}),
..Default::default()
}
}
#[cfg(test)]
mod tests {
use std::time::Duration;
use super::Metasrv;
#[test]
fn test_normalize_gc_timeout() {
assert_eq!(Metasrv::normalize_gc_timeout(Duration::ZERO), None);
assert_eq!(
Metasrv::normalize_gc_timeout(Duration::from_secs(10)),
Some(Duration::from_secs(10))
);
}
}

View File

@@ -282,6 +282,7 @@ impl LocalGcWorker {
let mut deleted_files = HashMap::new();
let mut deleted_indexes = HashMap::new();
let mut processed_regions = HashSet::new();
let tmp_ref_files = self.read_tmp_ref_files().await?;
for (region_id, region) in &self.regions {
let per_region_time = std::time::Instant::now();
@@ -309,6 +310,7 @@ impl LocalGcWorker {
.collect_vec();
deleted_files.insert(*region_id, files.into_iter().map(|f| f.file_id()).collect());
deleted_indexes.insert(*region_id, index_files);
processed_regions.insert(*region_id);
debug!(
"GC for region {} took {} secs.",
region_id,
@@ -323,6 +325,7 @@ impl LocalGcWorker {
deleted_files,
deleted_indexes,
need_retry_regions: HashSet::new(),
processed_regions,
};
Ok(report)
}

View File

@@ -19,7 +19,9 @@ use common_error::ext::BoxedError;
use common_function::handlers::ProcedureServiceHandler;
use common_meta::procedure_executor::{ExecutorContext, ProcedureExecutorRef};
use common_meta::rpc::procedure::{
ManageRegionFollowerRequest, MigrateRegionRequest, ProcedureStateResponse,
GcRegionsRequest as MetaGcRegionsRequest, GcResponse as MetaGcResponse,
GcTableRequest as MetaGcTableRequest, ManageRegionFollowerRequest, MigrateRegionRequest,
ProcedureStateResponse,
};
use common_query::error as query_error;
use common_query::error::Result as QueryResult;
@@ -90,4 +92,20 @@ impl ProcedureServiceHandler for ProcedureServiceOperator {
fn catalog_manager(&self) -> &CatalogManagerRef {
&self.catalog_manager
}
async fn gc_regions(&self, request: MetaGcRegionsRequest) -> QueryResult<MetaGcResponse> {
self.procedure_executor
.gc_regions(&ExecutorContext::default(), request)
.await
.map_err(BoxedError::new)
.context(query_error::ProcedureServiceSnafu)
}
async fn gc_table(&self, request: MetaGcTableRequest) -> QueryResult<MetaGcResponse> {
self.procedure_executor
.gc_table(&ExecutorContext::default(), request)
.await
.map_err(BoxedError::new)
.context(query_error::ProcedureServiceSnafu)
}
}

View File

@@ -114,6 +114,8 @@ pub struct GcReport {
pub deleted_indexes: HashMap<RegionId, Vec<(FileId, IndexVersion)>>,
/// Regions that need retry in next gc round, usually because their tmp ref files are outdated
pub need_retry_regions: HashSet<RegionId>,
/// Regions successfully processed in this GC run
pub processed_regions: HashSet<RegionId>,
}
impl GcReport {
@@ -126,6 +128,7 @@ impl GcReport {
deleted_files,
deleted_indexes,
need_retry_regions,
processed_regions: HashSet::new(),
}
}
@@ -139,7 +142,17 @@ impl GcReport {
);
*self_files = dedup.into_iter().collect();
}
for (region, files) in other.deleted_indexes {
let self_files = self.deleted_indexes.entry(region).or_default();
let dedup: HashSet<(FileId, IndexVersion)> = HashSet::from_iter(
std::mem::take(self_files)
.into_iter()
.chain(files.iter().cloned()),
);
*self_files = dedup.into_iter().collect();
}
self.need_retry_regions.extend(other.need_retry_regions);
self.processed_regions.extend(other.processed_regions);
// Remove regions that have succeeded from need_retry_regions
self.need_retry_regions
.retain(|region| !self.deleted_files.contains_key(region));

View File

@@ -927,6 +927,15 @@ pub async fn execute_sql(instance: &Arc<Instance>, sql: &str) -> Output {
.unwrap()
}
pub async fn try_execute_sql(
instance: &Arc<Instance>,
sql: &str,
) -> servers::error::Result<Output> {
SqlQueryHandler::do_query(instance.as_ref(), sql, QueryContext::arc())
.await
.remove(0)
}
pub async fn execute_sql_and_expect(instance: &Arc<Instance>, sql: &str, expected: &str) {
let output = execute_sql(instance, sql).await;
let output = output.data.pretty_print().await;

View File

@@ -33,6 +33,7 @@ use crate::cluster::GreptimeDbClusterBuilder;
use crate::test_util::{StorageType, TempDirGuard, execute_sql, get_test_store_config};
use crate::tests::test_util::{MockInstanceBuilder, TestContext, wait_procedure};
mod admin;
mod repart;
/// Helper function to get table route information for GC procedure

View File

@@ -0,0 +1,518 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! Integration tests for `ADMIN GC_TABLE` and `ADMIN GC_REGIONS` functions.
use client::OutputData;
use common_meta::key::table_repart::TableRepartValue;
use common_recordbatch::RecordBatches;
use common_telemetry::info;
use common_test_util::recordbatch::check_output_stream;
use datatypes::arrow::array::{Array, AsArray};
use datatypes::arrow::datatypes::UInt64Type;
use store_api::storage::RegionId;
use super::{distributed_with_gc, list_sst_files};
use crate::test_util::{StorageType, execute_sql, try_execute_sql};
#[tokio::test]
async fn test_admin_gc_table_different_store() {
let e = dotenv::dotenv();
common_telemetry::init_default_ut_logging();
let store_type = StorageType::build_storage_types_based_on_env();
info!("store type with .env path: {e:?}: {:?}", store_type);
for store in store_type {
info!("Running admin GC table test with storage type: {}", store);
test_admin_gc_table(&store).await;
}
}
/// Test `ADMIN GC_TABLE('table_name')` function
async fn test_admin_gc_table(store_type: &StorageType) {
let (test_context, _guard) = distributed_with_gc(store_type).await;
let instance = test_context.frontend();
// Step 1: Create table with append_mode to easily generate multiple files
let create_table_sql = r#"
CREATE TABLE test_admin_gc_table (
ts TIMESTAMP TIME INDEX,
val DOUBLE,
host STRING
) WITH (append_mode = 'true')
"#;
execute_sql(&instance, create_table_sql).await;
// Step 2: Generate SST files by inserting data and flushing multiple times
for i in 0..4 {
let insert_sql = format!(
r#"
INSERT INTO test_admin_gc_table (ts, val, host) VALUES
('2023-01-0{} 10:00:00', {}, 'host{}'),
('2023-01-0{} 11:00:00', {}, 'host{}'),
('2023-01-0{} 12:00:00', {}, 'host{}')
"#,
i + 1,
10.0 + i as f64,
i,
i + 1,
20.0 + i as f64,
i,
i + 1,
30.0 + i as f64,
i
);
execute_sql(&instance, &insert_sql).await;
// Flush the table to create SST files
let flush_sql = "ADMIN FLUSH_TABLE('test_admin_gc_table')";
execute_sql(&instance, flush_sql).await;
}
// List SST files before compaction (for verification)
let sst_files_before_compaction = list_sst_files(&test_context).await;
info!(
"SST files before compaction: {:?}",
sst_files_before_compaction
);
assert_eq!(sst_files_before_compaction.len(), 4); // 4 files from 4 flushes
// Step 3: Trigger compaction to create garbage SST files
let compact_sql = "ADMIN COMPACT_TABLE('test_admin_gc_table')";
execute_sql(&instance, compact_sql).await;
// List SST files after compaction (should have both old and new files)
let sst_files_after_compaction = list_sst_files(&test_context).await;
info!(
"SST files after compaction: {:?}",
sst_files_after_compaction
);
assert_eq!(sst_files_after_compaction.len(), 5); // 4 old + 1 new
// Step 4: Use ADMIN GC_TABLE to clean up garbage files
let gc_table_sql = "ADMIN GC_TABLE('test_admin_gc_table')";
let gc_output = execute_sql(&instance, gc_table_sql).await;
info!("GC_TABLE output: {:?}", gc_output);
// Step 5: Verify GC results
let sst_files_after_gc = list_sst_files(&test_context).await;
info!("SST files after GC: {:?}", sst_files_after_gc);
assert_eq!(sst_files_after_gc.len(), 1); // Only the compacted file should remain after gc
// Verify that data is still accessible
let count_sql = "SELECT COUNT(*) FROM test_admin_gc_table";
let count_output = execute_sql(&instance, count_sql).await;
let expected = r#"
+----------+
| count(*) |
+----------+
| 12 |
+----------+"#
.trim();
check_output_stream(count_output.data, expected).await;
let select_sql = "SELECT * FROM test_admin_gc_table ORDER BY ts";
let select_output = execute_sql(&instance, select_sql).await;
let expected = r#"
+---------------------+------+-------+
| ts | val | host |
+---------------------+------+-------+
| 2023-01-01T10:00:00 | 10.0 | host0 |
| 2023-01-01T11:00:00 | 20.0 | host0 |
| 2023-01-01T12:00:00 | 30.0 | host0 |
| 2023-01-02T10:00:00 | 11.0 | host1 |
| 2023-01-02T11:00:00 | 21.0 | host1 |
| 2023-01-02T12:00:00 | 31.0 | host1 |
| 2023-01-03T10:00:00 | 12.0 | host2 |
| 2023-01-03T11:00:00 | 22.0 | host2 |
| 2023-01-03T12:00:00 | 32.0 | host2 |
| 2023-01-04T10:00:00 | 13.0 | host3 |
| 2023-01-04T11:00:00 | 23.0 | host3 |
| 2023-01-04T12:00:00 | 33.0 | host3 |
+---------------------+------+-------+"#
.trim();
check_output_stream(select_output.data, expected).await;
info!("ADMIN GC_TABLE test completed successfully");
}
#[tokio::test]
async fn test_admin_gc_regions_different_store() {
let _ = dotenv::dotenv();
common_telemetry::init_default_ut_logging();
let store_type = StorageType::build_storage_types_based_on_env();
info!("store type: {:?}", store_type);
for store in store_type {
info!("Running admin GC regions test with storage type: {}", store);
test_admin_gc_regions(&store).await;
}
}
/// Test `ADMIN GC_REGIONS(region_id)` function
async fn test_admin_gc_regions(store_type: &StorageType) {
let (test_context, _guard) = distributed_with_gc(store_type).await;
let instance = test_context.frontend();
// Step 1: Create table with append_mode to easily generate multiple files
let create_table_sql = r#"
CREATE TABLE test_admin_gc_regions (
ts TIMESTAMP TIME INDEX,
val DOUBLE,
host STRING
) WITH (append_mode = 'true')
"#;
execute_sql(&instance, create_table_sql).await;
// Step 2: Generate SST files by inserting data and flushing multiple times
for i in 0..4 {
let insert_sql = format!(
r#"
INSERT INTO test_admin_gc_regions (ts, val, host) VALUES
('2023-01-0{} 10:00:00', {}, 'host{}'),
('2023-01-0{} 11:00:00', {}, 'host{}'),
('2023-01-0{} 12:00:00', {}, 'host{}')
"#,
i + 1,
10.0 + i as f64,
i,
i + 1,
20.0 + i as f64,
i,
i + 1,
30.0 + i as f64,
i
);
execute_sql(&instance, &insert_sql).await;
// Flush the table to create SST files
let flush_sql = "ADMIN FLUSH_TABLE('test_admin_gc_regions')";
execute_sql(&instance, flush_sql).await;
}
// List SST files before compaction (for verification)
let sst_files_before_compaction = list_sst_files(&test_context).await;
info!(
"SST files before compaction: {:?}",
sst_files_before_compaction
);
assert_eq!(sst_files_before_compaction.len(), 4); // 4 files from 4 flushes
// Step 3: Trigger compaction to create garbage SST files
let compact_sql = "ADMIN COMPACT_TABLE('test_admin_gc_regions')";
execute_sql(&instance, compact_sql).await;
// List SST files after compaction (should have both old and new files)
let sst_files_after_compaction = list_sst_files(&test_context).await;
info!(
"SST files after compaction: {:?}",
sst_files_after_compaction
);
assert_eq!(sst_files_after_compaction.len(), 5); // 4 old + 1 new
// Step 4: Get region id from information_schema
let region_id_sql = "SELECT greptime_partition_id FROM information_schema.partitions WHERE table_name = 'test_admin_gc_regions' ORDER BY greptime_partition_id LIMIT 1";
let region_output = execute_sql(&instance, region_id_sql).await;
info!("Region ID output: {:?}", region_output);
let OutputData::Stream(region_stream) = region_output.data else {
panic!("Expected stream output for region id query");
};
// Extract the region id from the result
let region_id = {
let batches = RecordBatches::try_collect(region_stream).await.unwrap();
let batch = &batches.iter().next().unwrap();
let column = batch.column(0);
let array = column.as_primitive::<UInt64Type>();
array.value(0)
};
info!("Extracted region_id: {}", region_id);
// Step 5: Use ADMIN GC_REGIONS to clean up garbage files for the specific region
let gc_regions_sql = format!("ADMIN GC_REGIONS({})", region_id);
let gc_output = execute_sql(&instance, &gc_regions_sql).await;
info!("GC_REGIONS output: {:?}", gc_output);
// Step 6: Verify GC results
let sst_files_after_gc = list_sst_files(&test_context).await;
info!("SST files after GC: {:?}", sst_files_after_gc);
assert_eq!(sst_files_after_gc.len(), 1); // Only the compacted file should remain after gc
// Verify that data is still accessible
let count_sql = "SELECT COUNT(*) FROM test_admin_gc_regions";
let count_output = execute_sql(&instance, count_sql).await;
let expected = r#"
+----------+
| count(*) |
+----------+
| 12 |
+----------+"#
.trim();
check_output_stream(count_output.data, expected).await;
let select_sql = "SELECT * FROM test_admin_gc_regions ORDER BY ts";
let select_output = execute_sql(&instance, select_sql).await;
let expected = r#"
+---------------------+------+-------+
| ts | val | host |
+---------------------+------+-------+
| 2023-01-01T10:00:00 | 10.0 | host0 |
| 2023-01-01T11:00:00 | 20.0 | host0 |
| 2023-01-01T12:00:00 | 30.0 | host0 |
| 2023-01-02T10:00:00 | 11.0 | host1 |
| 2023-01-02T11:00:00 | 21.0 | host1 |
| 2023-01-02T12:00:00 | 31.0 | host1 |
| 2023-01-03T10:00:00 | 12.0 | host2 |
| 2023-01-03T11:00:00 | 22.0 | host2 |
| 2023-01-03T12:00:00 | 32.0 | host2 |
| 2023-01-04T10:00:00 | 13.0 | host3 |
| 2023-01-04T11:00:00 | 23.0 | host3 |
| 2023-01-04T12:00:00 | 33.0 | host3 |
+---------------------+------+-------+"#
.trim();
check_output_stream(select_output.data, expected).await;
info!("ADMIN GC_REGIONS test completed successfully");
}
#[tokio::test]
async fn test_admin_gc_missing_cases_different_store() {
let _ = dotenv::dotenv();
common_telemetry::init_default_ut_logging();
let store_type = StorageType::build_storage_types_based_on_env();
info!("store type: {:?}", store_type);
for store in store_type {
info!(
"Running admin GC missing cases test with storage type: {}",
store
);
test_admin_gc_missing_cases(&store).await;
}
}
async fn test_admin_gc_missing_cases(store_type: &StorageType) {
let (test_context, _guard) = distributed_with_gc(store_type).await;
let instance = test_context.frontend();
let metasrv = test_context.metasrv();
// Case 1: Invalid table name
let invalid_table_sql = "ADMIN GC_TABLE('no_such_table')";
let invalid_table_result = try_execute_sql(&instance, invalid_table_sql).await;
assert!(
invalid_table_result.is_err(),
"Expected error for invalid table name"
);
// Case 2: Invalid region id (same table, non-existent region number)
let invalid_region_table = "test_admin_gc_invalid_region";
create_append_table(&instance, invalid_region_table, None).await;
let table_id = get_table_id(&instance, invalid_region_table).await;
let region_ids = fetch_region_ids(&instance, invalid_region_table).await;
let base_region = RegionId::from_u64(*region_ids.first().expect("region id exists"));
let invalid_region = RegionId::new(table_id, base_region.region_number().saturating_add(100));
let invalid_region_sql = format!("ADMIN GC_REGIONS({})", invalid_region.as_u64());
let sst_before_invalid_region = list_sst_files(&test_context).await;
let invalid_region_result = try_execute_sql(&instance, &invalid_region_sql).await;
match invalid_region_result {
Ok(output) => {
let processed_regions = extract_u64_output(output.data).await;
assert_eq!(processed_regions, 0);
let sst_after_invalid_region = list_sst_files(&test_context).await;
assert_eq!(
sst_before_invalid_region.len(),
sst_after_invalid_region.len()
);
}
Err(_) => {
let sst_after_invalid_region = list_sst_files(&test_context).await;
assert_eq!(
sst_before_invalid_region.len(),
sst_after_invalid_region.len()
);
}
}
// Case 3: No garbage to collect (idempotent)
let no_garbage_table = "test_admin_gc_no_garbage";
create_append_table(&instance, no_garbage_table, None).await;
insert_and_flush(&instance, no_garbage_table, 1).await;
let sst_before_gc = list_sst_files(&test_context).await;
let no_garbage_gc_sql = format!("ADMIN GC_TABLE('{no_garbage_table}')");
let no_garbage_output = execute_sql(&instance, &no_garbage_gc_sql).await;
let processed_regions = extract_u64_output(no_garbage_output.data).await;
assert_eq!(processed_regions, 1);
let sst_after_gc = list_sst_files(&test_context).await;
assert_eq!(sst_before_gc.len(), sst_after_gc.len());
// Case 4: Multi-region table, multi-region GC
let multi_region_table = "test_admin_gc_multi_region";
let partition_clause =
Some("PARTITION ON COLUMNS (host) (host < 'm', host >= 'm')".to_string());
create_append_table(&instance, multi_region_table, partition_clause.as_deref()).await;
insert_and_flush(&instance, multi_region_table, 4).await;
let sst_before_compaction = list_sst_files(&test_context).await;
let compact_sql = format!("ADMIN COMPACT_TABLE('{multi_region_table}')");
execute_sql(&instance, &compact_sql).await;
let sst_after_compaction = list_sst_files(&test_context).await;
assert!(sst_after_compaction.len() >= sst_before_compaction.len());
let region_ids = fetch_region_ids(&instance, multi_region_table).await;
let gc_regions_sql = format!(
"ADMIN GC_REGIONS({})",
region_ids
.iter()
.map(|id| id.to_string())
.collect::<Vec<_>>()
.join(",")
);
let gc_regions_output = execute_sql(&instance, &gc_regions_sql).await;
let processed_regions = extract_u64_output(gc_regions_output.data).await;
assert_eq!(processed_regions as usize, region_ids.len());
let sst_after_gc = list_sst_files(&test_context).await;
assert!(sst_after_gc.len() < sst_after_compaction.len());
// Case 5: Manual GC on dropped region
let dropped_region_table = "test_admin_gc_dropped_region";
create_append_table(&instance, dropped_region_table, partition_clause.as_deref()).await;
let dropped_table_id = get_table_id(&instance, dropped_region_table).await;
let (_routes, regions) =
super::get_table_route(metasrv.table_metadata_manager(), dropped_table_id).await;
let base_region = *regions.first().expect("table has at least one region");
let dropped_region = RegionId::new(
dropped_table_id,
base_region.region_number().saturating_add(100),
);
let dst_region = RegionId::new(
dropped_table_id,
base_region.region_number().saturating_add(200),
);
let repart_mgr = metasrv.table_metadata_manager().table_repart_manager();
let current = repart_mgr
.get_with_raw_bytes(dropped_table_id)
.await
.unwrap();
let mut repart_value = TableRepartValue::new();
repart_value.update_mappings(dropped_region, &[dst_region]);
repart_mgr
.upsert_value(dropped_table_id, current, &repart_value)
.await
.unwrap();
let dropped_gc_sql = format!("ADMIN GC_REGIONS({})", dropped_region.as_u64());
let dropped_gc_output = execute_sql(&instance, &dropped_gc_sql).await;
let processed_regions = extract_u64_output(dropped_gc_output.data).await;
assert_eq!(processed_regions, 1);
// Case 6: Cooldown bypass for manual GC (should still process regions on immediate re-run)
let cooldown_region = *region_ids.first().expect("region id exists");
let cooldown_gc_sql = format!("ADMIN GC_REGIONS({cooldown_region})");
execute_sql(&instance, &cooldown_gc_sql).await;
let cooldown_gc_output = execute_sql(&instance, &cooldown_gc_sql).await;
let processed_regions = extract_u64_output(cooldown_gc_output.data).await;
assert_eq!(processed_regions, 1);
}
async fn create_append_table(
instance: &std::sync::Arc<frontend::instance::Instance>,
table_name: &str,
partition_clause: Option<&str>,
) {
let partition_clause = partition_clause.unwrap_or("");
let create_table_sql = format!(
"\
CREATE TABLE {table_name} (\
ts TIMESTAMP TIME INDEX,\
val DOUBLE,\
host STRING\
) {partition_clause} WITH (append_mode = 'true')\
"
);
execute_sql(instance, &create_table_sql).await;
}
async fn insert_and_flush(
instance: &std::sync::Arc<frontend::instance::Instance>,
table_name: &str,
rounds: usize,
) {
for i in 0..rounds {
let insert_sql = format!(
"\
INSERT INTO {table_name} (ts, val, host) VALUES\
('2023-01-0{} 10:00:00', {}, 'host{}'),\
('2023-01-0{} 11:00:00', {}, 'host{}'),\
('2023-01-0{} 12:00:00', {}, 'host{}')\
",
i + 1,
10.0 + i as f64,
i,
i + 1,
20.0 + i as f64,
i,
i + 1,
30.0 + i as f64,
i
);
execute_sql(instance, &insert_sql).await;
let flush_sql = format!("ADMIN FLUSH_TABLE('{table_name}')");
execute_sql(instance, &flush_sql).await;
}
}
async fn fetch_region_ids(
instance: &std::sync::Arc<frontend::instance::Instance>,
table_name: &str,
) -> Vec<u64> {
let region_id_sql = format!(
"SELECT greptime_partition_id FROM information_schema.partitions WHERE table_name = '{}' ORDER BY greptime_partition_id",
table_name
);
let region_output = execute_sql(instance, &region_id_sql).await;
let OutputData::Stream(region_stream) = region_output.data else {
panic!("Expected stream output for region id query");
};
let batches = RecordBatches::try_collect(region_stream).await.unwrap();
let mut region_ids = Vec::new();
for batch in batches.iter() {
let column = batch.column(0);
let array = column.as_primitive::<UInt64Type>();
for idx in 0..array.len() {
if array.is_valid(idx) {
region_ids.push(array.value(idx));
}
}
}
region_ids
}
async fn get_table_id(
instance: &std::sync::Arc<frontend::instance::Instance>,
table_name: &str,
) -> table::metadata::TableId {
let table = instance
.catalog_manager()
.table("greptime", "public", table_name, None)
.await
.unwrap()
.unwrap();
table.table_info().table_id()
}
async fn extract_u64_output(output: OutputData) -> u64 {
let recordbatches = match output {
OutputData::Stream(stream) => RecordBatches::try_collect(stream).await.unwrap(),
OutputData::RecordBatches(recordbatches) => recordbatches,
_ => panic!("Unexpected output type"),
};
let batch = recordbatches.iter().next().expect("non-empty recordbatch");
let column = batch.column(0);
let array = column.as_primitive::<UInt64Type>();
array.value(0)
}

View File

@@ -125,8 +125,17 @@ async fn trigger_table_gc(metasrv: &Arc<Metasrv>, table_name: &str) {
async fn trigger_full_gc(ticker: &GcTickerRef) {
info!("triggering full gc");
let (tx, rx) = oneshot::channel();
ticker.sender.send(gc::Event::Manually(tx)).await.unwrap();
let _ = rx.await.unwrap();
ticker
.sender
.send(gc::Event::Manually {
sender: tx,
region_ids: None,
full_file_listing: None,
timeout: None,
})
.await
.unwrap();
let _ = rx.await.unwrap().unwrap();
}
fn query_partitions_sql(table_name: &str) -> String {