diff --git a/src/common/function/src/admin.rs b/src/common/function/src/admin.rs index e7fd186b86..c31b608967 100644 --- a/src/common/function/src/admin.rs +++ b/src/common/function/src/admin.rs @@ -15,6 +15,7 @@ mod build_index_table; mod flush_compact_region; mod flush_compact_table; +mod gc; mod migrate_region; mod reconcile_catalog; mod reconcile_database; @@ -22,6 +23,7 @@ mod reconcile_table; use flush_compact_region::{CompactRegionFunction, FlushRegionFunction}; use flush_compact_table::{CompactTableFunction, FlushTableFunction}; +use gc::{GcRegionsFunction, GcTableFunction}; use migrate_region::MigrateRegionFunction; use reconcile_catalog::ReconcileCatalogFunction; use reconcile_database::ReconcileDatabaseFunction; @@ -42,6 +44,8 @@ impl AdminFunction { registry.register(CompactRegionFunction::factory()); registry.register(FlushTableFunction::factory()); registry.register(CompactTableFunction::factory()); + registry.register(GcRegionsFunction::factory()); + registry.register(GcTableFunction::factory()); registry.register(BuildIndexFunction::factory()); registry.register(FlushFlowFunction::factory()); registry.register(ReconcileCatalogFunction::factory()); diff --git a/src/common/function/src/admin/gc.rs b/src/common/function/src/admin/gc.rs new file mode 100644 index 0000000000..07473b724e --- /dev/null +++ b/src/common/function/src/admin/gc.rs @@ -0,0 +1,139 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::time::Duration; + +use common_error::ext::BoxedError; +use common_macro::admin_fn; +use common_meta::rpc::procedure::{GcRegionsRequest, GcTableRequest}; +use common_query::error::{ + InvalidFuncArgsSnafu, MissingProcedureServiceHandlerSnafu, Result, TableMutationSnafu, + UnsupportedInputDataTypeSnafu, +}; +use datafusion_expr::{Signature, TypeSignature, Volatility}; +use datatypes::arrow::datatypes::DataType as ArrowDataType; +use datatypes::prelude::*; +use session::context::QueryContextRef; +use snafu::{ResultExt, ensure}; + +use crate::handlers::ProcedureServiceHandlerRef; +use crate::helper::cast_u64; + +const DEFAULT_GC_TIMEOUT: Duration = Duration::from_secs(60); + +#[admin_fn( + name = GcRegionsFunction, + display_name = gc_regions, + sig_fn = gc_regions_signature, + ret = uint64 +)] +pub(crate) async fn gc_regions( + procedure_service_handler: &ProcedureServiceHandlerRef, + _ctx: &QueryContextRef, + params: &[ValueRef<'_>], +) -> Result { + ensure!( + !params.is_empty(), + InvalidFuncArgsSnafu { + err_msg: "The length of the args is not correct, expect at least 1 region id, have 0" + .to_string(), + } + ); + + let mut region_ids = Vec::with_capacity(params.len()); + for param in params { + let Some(region_id) = cast_u64(param)? else { + return UnsupportedInputDataTypeSnafu { + function: "gc_regions", + datatypes: params.iter().map(|v| v.data_type()).collect::>(), + } + .fail(); + }; + region_ids.push(region_id); + } + + let resp = procedure_service_handler + .gc_regions(GcRegionsRequest { + region_ids, + full_file_listing: false, + timeout: DEFAULT_GC_TIMEOUT, + }) + .await?; + + Ok(Value::from(resp.processed_regions)) +} + +#[admin_fn( + name = GcTableFunction, + display_name = gc_table, + sig_fn = gc_table_signature, + ret = uint64 +)] +pub(crate) async fn gc_table( + procedure_service_handler: &ProcedureServiceHandlerRef, + query_ctx: &QueryContextRef, + params: &[ValueRef<'_>], +) -> Result { + ensure!( + params.len() == 1, + InvalidFuncArgsSnafu { + err_msg: format!( + "The length of the args is not correct, expect 1, have: {}", + params.len() + ), + } + ); + + let ValueRef::String(table_name) = params[0] else { + return UnsupportedInputDataTypeSnafu { + function: "gc_table", + datatypes: params.iter().map(|v| v.data_type()).collect::>(), + } + .fail(); + }; + + let (catalog_name, schema_name, table_name) = + session::table_name::table_name_to_full_name(table_name, query_ctx) + .map_err(BoxedError::new) + .context(TableMutationSnafu)?; + + let resp = procedure_service_handler + .gc_table(GcTableRequest { + catalog_name, + schema_name, + table_name, + full_file_listing: false, + timeout: DEFAULT_GC_TIMEOUT, + }) + .await?; + + Ok(Value::from(resp.processed_regions)) +} + +fn gc_regions_signature() -> Signature { + Signature::variadic( + ConcreteDataType::numerics() + .into_iter() + .map(|dt| dt.as_arrow_type()) + .collect(), + Volatility::Immutable, + ) +} + +fn gc_table_signature() -> Signature { + Signature::one_of( + vec![TypeSignature::Uniform(1, vec![ArrowDataType::Utf8])], + Volatility::Immutable, + ) +} diff --git a/src/common/function/src/handlers.rs b/src/common/function/src/handlers.rs index 0e6060e90c..50318fd14f 100644 --- a/src/common/function/src/handlers.rs +++ b/src/common/function/src/handlers.rs @@ -19,7 +19,9 @@ use async_trait::async_trait; use catalog::CatalogManagerRef; use common_base::AffectedRows; use common_meta::rpc::procedure::{ - ManageRegionFollowerRequest, MigrateRegionRequest, ProcedureStateResponse, + GcRegionsRequest as MetaGcRegionsRequest, GcResponse as MetaGcResponse, + GcTableRequest as MetaGcTableRequest, ManageRegionFollowerRequest, MigrateRegionRequest, + ProcedureStateResponse, }; use common_query::Output; use common_query::error::Result; @@ -85,6 +87,12 @@ pub trait ProcedureServiceHandler: Send + Sync { /// Get the catalog manager fn catalog_manager(&self) -> &CatalogManagerRef; + + /// Manually trigger GC for specific regions. + async fn gc_regions(&self, request: MetaGcRegionsRequest) -> Result; + + /// Manually trigger GC for a table. + async fn gc_table(&self, request: MetaGcTableRequest) -> Result; } /// This flow service handler is only use for flush flow for now. diff --git a/src/common/function/src/state.rs b/src/common/function/src/state.rs index d1a3d341b4..06dff44e79 100644 --- a/src/common/function/src/state.rs +++ b/src/common/function/src/state.rs @@ -37,7 +37,8 @@ impl FunctionState { use catalog::CatalogManagerRef; use common_base::AffectedRows; use common_meta::rpc::procedure::{ - ManageRegionFollowerRequest, MigrateRegionRequest, ProcedureStateResponse, + GcRegionsRequest, GcResponse, GcTableRequest, ManageRegionFollowerRequest, + MigrateRegionRequest, ProcedureStateResponse, }; use common_query::Output; use common_query::error::Result; @@ -82,6 +83,24 @@ impl FunctionState { Ok(()) } + async fn gc_regions(&self, _request: GcRegionsRequest) -> Result { + Ok(GcResponse { + processed_regions: 1, + need_retry_regions: vec![], + deleted_files: 0, + deleted_indexes: 0, + }) + } + + async fn gc_table(&self, _request: GcTableRequest) -> Result { + Ok(GcResponse { + processed_regions: 1, + need_retry_regions: vec![], + deleted_files: 0, + deleted_indexes: 0, + }) + } + fn catalog_manager(&self) -> &CatalogManagerRef { unimplemented!() } diff --git a/src/common/meta/src/procedure_executor.rs b/src/common/meta/src/procedure_executor.rs index f27f7ebf47..c0165dd80f 100644 --- a/src/common/meta/src/procedure_executor.rs +++ b/src/common/meta/src/procedure_executor.rs @@ -25,8 +25,8 @@ use crate::error::{ }; use crate::rpc::ddl::{SubmitDdlTaskRequest, SubmitDdlTaskResponse}; use crate::rpc::procedure::{ - self, ManageRegionFollowerRequest, MigrateRegionRequest, MigrateRegionResponse, - ProcedureStateResponse, + self, GcRegionsRequest, GcResponse, GcTableRequest, ManageRegionFollowerRequest, + MigrateRegionRequest, MigrateRegionResponse, ProcedureStateResponse, }; /// The context of procedure executor. @@ -78,6 +78,30 @@ pub trait ProcedureExecutor: Send + Sync { pid: &str, ) -> Result; + /// Manually trigger GC for the specified regions. + async fn gc_regions( + &self, + _ctx: &ExecutorContext, + _request: GcRegionsRequest, + ) -> Result { + UnsupportedSnafu { + operation: "gc_regions", + } + .fail() + } + + /// Manually trigger GC for the specified table. + async fn gc_table( + &self, + _ctx: &ExecutorContext, + _request: GcTableRequest, + ) -> Result { + UnsupportedSnafu { + operation: "gc_table", + } + .fail() + } + async fn list_procedures(&self, ctx: &ExecutorContext) -> Result; } diff --git a/src/common/meta/src/rpc/procedure.rs b/src/common/meta/src/rpc/procedure.rs index 313b7e178e..baab254fef 100644 --- a/src/common/meta/src/rpc/procedure.rs +++ b/src/common/meta/src/rpc/procedure.rs @@ -78,6 +78,30 @@ pub struct RemoveRegionFollowerRequest { pub peer_id: u64, } +#[derive(Debug, Clone)] +pub struct GcRegionsRequest { + pub region_ids: Vec, + pub full_file_listing: bool, + pub timeout: Duration, +} + +#[derive(Debug, Clone)] +pub struct GcTableRequest { + pub catalog_name: String, + pub schema_name: String, + pub table_name: String, + pub full_file_listing: bool, + pub timeout: Duration, +} + +#[derive(Debug, Clone, Default, PartialEq, Eq)] +pub struct GcResponse { + pub processed_regions: u64, + pub need_retry_regions: Vec, + pub deleted_files: u64, + pub deleted_indexes: u64, +} + /// Cast the protobuf [`ProcedureId`] to common [`ProcedureId`]. pub fn pb_pid_to_pid(pid: &PbProcedureId) -> Result { ProcedureId::parse_str(&String::from_utf8_lossy(&pid.key)).with_context(|_| { diff --git a/src/meta-client/src/client.rs b/src/meta-client/src/client.rs index 00b208211a..4a79105615 100644 --- a/src/meta-client/src/client.rs +++ b/src/meta-client/src/client.rs @@ -48,9 +48,9 @@ use common_meta::range_stream::PaginationStream; use common_meta::rpc::KeyValue; use common_meta::rpc::ddl::{SubmitDdlTaskRequest, SubmitDdlTaskResponse}; use common_meta::rpc::procedure::{ - AddRegionFollowerRequest, AddTableFollowerRequest, ManageRegionFollowerRequest, - MigrateRegionRequest, MigrateRegionResponse, ProcedureStateResponse, - RemoveRegionFollowerRequest, RemoveTableFollowerRequest, + AddRegionFollowerRequest, AddTableFollowerRequest, GcRegionsRequest, GcResponse, + GcTableRequest, ManageRegionFollowerRequest, MigrateRegionRequest, MigrateRegionResponse, + ProcedureStateResponse, RemoveRegionFollowerRequest, RemoveTableFollowerRequest, }; use common_meta::rpc::store::{ BatchDeleteRequest, BatchDeleteResponse, BatchGetRequest, BatchGetResponse, BatchPutRequest, @@ -387,6 +387,28 @@ impl ProcedureExecutor for MetaClient { .context(meta_error::ExternalSnafu) } + async fn gc_regions( + &self, + _ctx: &ExecutorContext, + request: GcRegionsRequest, + ) -> MetaResult { + self.gc_regions(request) + .await + .map_err(BoxedError::new) + .context(meta_error::ExternalSnafu) + } + + async fn gc_table( + &self, + _ctx: &ExecutorContext, + request: GcTableRequest, + ) -> MetaResult { + self.gc_table(request) + .await + .map_err(BoxedError::new) + .context(meta_error::ExternalSnafu) + } + async fn list_procedures(&self, _ctx: &ExecutorContext) -> MetaResult { self.procedure_client() .map_err(BoxedError::new) @@ -715,6 +737,16 @@ impl MetaClient { self.procedure_client()?.reconcile(request).await } + /// Manually trigger GC for specific regions. + pub async fn gc_regions(&self, request: GcRegionsRequest) -> Result { + self.procedure_client()?.gc_regions(request).await + } + + /// Manually trigger GC for a table (all its regions). + pub async fn gc_table(&self, request: GcTableRequest) -> Result { + self.procedure_client()?.gc_table(request).await + } + /// Submit a DDL task pub async fn submit_ddl_task( &self, diff --git a/src/meta-client/src/client/procedure.rs b/src/meta-client/src/client/procedure.rs index f813aebe20..f854193327 100644 --- a/src/meta-client/src/client/procedure.rs +++ b/src/meta-client/src/client/procedure.rs @@ -18,11 +18,16 @@ use std::time::Duration; use api::v1::meta::procedure_service_client::ProcedureServiceClient; use api::v1::meta::{ - DdlTaskRequest, DdlTaskResponse, MigrateRegionRequest, MigrateRegionResponse, - ProcedureDetailRequest, ProcedureDetailResponse, ProcedureId, ProcedureStateResponse, - QueryProcedureRequest, ReconcileRequest, ReconcileResponse, ResponseHeader, Role, + DdlTaskRequest, DdlTaskResponse, GcRegionsRequest, GcRegionsResponse, GcTableRequest, + GcTableResponse, MigrateRegionRequest, MigrateRegionResponse, ProcedureDetailRequest, + ProcedureDetailResponse, ProcedureId, ProcedureStateResponse, QueryProcedureRequest, + ReconcileRequest, ReconcileResponse, RequestHeader, ResponseHeader, Role, }; use common_grpc::channel_manager::ChannelManager; +use common_meta::rpc::procedure::{ + GcRegionsRequest as MetaGcRegionsRequest, GcResponse as MetaGcResponse, + GcTableRequest as MetaGcTableRequest, +}; use common_telemetry::tracing_context::TracingContext; use common_telemetry::{error, info, warn}; use snafu::{ResultExt, ensure}; @@ -105,6 +110,16 @@ impl Client { let inner = self.inner.read().await; inner.list_procedures().await } + + pub async fn gc_regions(&self, request: MetaGcRegionsRequest) -> Result { + let inner = self.inner.read().await; + inner.gc_regions(request).await + } + + pub async fn gc_table(&self, request: MetaGcTableRequest) -> Result { + let inner = self.inner.read().await; + inner.gc_table(request).await + } } #[derive(Debug)] @@ -260,6 +275,74 @@ impl Inner { .await } + async fn gc_regions(&self, request: MetaGcRegionsRequest) -> Result { + let req = GcRegionsRequest { + header: Some(RequestHeader { + protocol_version: 0, + member_id: self.id, + role: self.role as i32, + tracing_context: TracingContext::from_current_span().to_w3c(), + }), + region_ids: request.region_ids, + full_file_listing: request.full_file_listing, + timeout_secs: request.timeout.as_secs() as u32, + }; + + let resp: GcRegionsResponse = self + .with_retry( + "gc_regions", + move |mut client| { + let mut req = Request::new(req.clone()); + req.set_timeout(self.timeout); + async move { client.gc_regions(req).await.map(|res| res.into_inner()) } + }, + |resp: &GcRegionsResponse| &resp.header, + ) + .await?; + + Ok(MetaGcResponse { + processed_regions: resp.processed_regions, + need_retry_regions: resp.need_retry_regions, + deleted_files: resp.deleted_files, + deleted_indexes: resp.deleted_indexes, + }) + } + + async fn gc_table(&self, request: MetaGcTableRequest) -> Result { + let req = GcTableRequest { + header: Some(RequestHeader { + protocol_version: 0, + member_id: self.id, + role: self.role as i32, + tracing_context: TracingContext::from_current_span().to_w3c(), + }), + catalog_name: request.catalog_name, + schema_name: request.schema_name, + table_name: request.table_name, + full_file_listing: request.full_file_listing, + timeout_secs: request.timeout.as_secs() as u32, + }; + + let resp: GcTableResponse = self + .with_retry( + "gc_table", + move |mut client| { + let mut req = Request::new(req.clone()); + req.set_timeout(self.timeout); + async move { client.gc_table(req).await.map(|res| res.into_inner()) } + }, + |resp: &GcTableResponse| &resp.header, + ) + .await?; + + Ok(MetaGcResponse { + processed_regions: resp.processed_regions, + need_retry_regions: resp.need_retry_regions, + deleted_files: resp.deleted_files, + deleted_indexes: resp.deleted_indexes, + }) + } + async fn query_procedure_state(&self, pid: &str) -> Result { let mut req = QueryProcedureRequest { pid: Some(ProcedureId { key: pid.into() }), @@ -333,10 +416,11 @@ mod tests { use api::v1::meta::heartbeat_server::{Heartbeat, HeartbeatServer}; use api::v1::meta::procedure_service_server::{ProcedureService, ProcedureServiceServer}; use api::v1::meta::{ - AskLeaderRequest, AskLeaderResponse, DdlTaskRequest, DdlTaskResponse, HeartbeatRequest, - HeartbeatResponse, MigrateRegionRequest, MigrateRegionResponse, Peer, - ProcedureDetailRequest, ProcedureDetailResponse, ProcedureStateResponse, - QueryProcedureRequest, ReconcileRequest, ReconcileResponse, ResponseHeader, Role, + AskLeaderRequest, AskLeaderResponse, DdlTaskRequest, DdlTaskResponse, GcRegionsRequest, + GcRegionsResponse, GcTableRequest, GcTableResponse, HeartbeatRequest, HeartbeatResponse, + MigrateRegionRequest, MigrateRegionResponse, Peer, ProcedureDetailRequest, + ProcedureDetailResponse, ProcedureStateResponse, QueryProcedureRequest, ReconcileRequest, + ReconcileResponse, ResponseHeader, Role, }; use async_trait::async_trait; use common_error::status_code::StatusCode; @@ -435,6 +519,20 @@ mod tests { ) -> Result, Status> { Err(Status::unimplemented("details is not used in this test")) } + + async fn gc_regions( + &self, + _request: Request, + ) -> Result, Status> { + Err(Status::unimplemented("gc_regions is not used in this test")) + } + + async fn gc_table( + &self, + _request: Request, + ) -> Result, Status> { + Err(Status::unimplemented("gc_table is not used in this test")) + } } #[tokio::test(flavor = "multi_thread")] diff --git a/src/meta-srv/src/service/procedure.rs b/src/meta-srv/src/service/procedure.rs index 90fb66f39c..0754e7b777 100644 --- a/src/meta-srv/src/service/procedure.rs +++ b/src/meta-srv/src/service/procedure.rs @@ -16,18 +16,28 @@ use std::time::Duration; use api::v1::meta::reconcile_request::Target; use api::v1::meta::{ - DdlTaskRequest as PbDdlTaskRequest, DdlTaskResponse as PbDdlTaskResponse, MigrateRegionRequest, + DdlTaskRequest as PbDdlTaskRequest, DdlTaskResponse as PbDdlTaskResponse, GcRegionsRequest, + GcRegionsResponse, GcTableRequest, GcTableResponse, MigrateRegionRequest, MigrateRegionResponse, ProcedureDetailRequest, ProcedureDetailResponse, ProcedureStateResponse, QueryProcedureRequest, ReconcileCatalog, ReconcileDatabase, ReconcileRequest, ReconcileResponse, ReconcileTable, ResolveStrategy, procedure_service_server, }; +use common_meta::key::TableMetadataManagerRef; +use common_meta::key::table_name::TableNameKey; use common_meta::procedure_executor::ExecutorContext; use common_meta::rpc::ddl::{DdlTask, SubmitDdlTaskRequest}; -use common_meta::rpc::procedure; +use common_meta::rpc::procedure::{ + self, GcRegionsRequest as MetaGcRegionsRequest, GcResponse, + GcTableRequest as MetaGcTableRequest, +}; +use common_procedure::{ProcedureWithId, watcher}; use snafu::{OptionExt, ResultExt}; +use store_api::storage::RegionId; use table::table_reference::TableReference; use tonic::Request; +use crate::error::{TableMetadataManagerSnafu, TableNotFoundSnafu}; +use crate::gc::{BatchGcProcedure, Region2Peers}; use crate::metasrv::Metasrv; use crate::procedure::region_migration::manager::{ RegionMigrationProcedureTask, RegionMigrationTriggerReason, @@ -240,4 +250,192 @@ impl procedure_service_server::ProcedureService for Metasrv { metas, ))) } + + async fn gc_regions( + &self, + request: Request, + ) -> GrpcResult { + check_leader!(self, request, GcRegionsResponse, "`gc_regions`"); + + let GcRegionsRequest { + header, + region_ids, + full_file_listing, + timeout_secs, + } = request.into_inner(); + + let _header = header.context(error::MissingRequestHeaderSnafu)?; + + let response = self + .handle_gc_regions(MetaGcRegionsRequest { + region_ids, + full_file_listing, + timeout: Duration::from_secs(timeout_secs as u64), + }) + .await?; + + Ok(Response::new(gc_response_to_regions_pb(response))) + } + + async fn gc_table(&self, request: Request) -> GrpcResult { + check_leader!(self, request, GcTableResponse, "`gc_table`"); + + let GcTableRequest { + header, + catalog_name, + schema_name, + table_name, + full_file_listing, + timeout_secs, + } = request.into_inner(); + + let _header = header.context(error::MissingRequestHeaderSnafu)?; + + let response = self + .handle_gc_table(MetaGcTableRequest { + catalog_name, + schema_name, + table_name, + full_file_listing, + timeout: Duration::from_secs(timeout_secs as u64), + }) + .await?; + + Ok(Response::new(gc_response_to_table_pb(response))) + } +} + +impl Metasrv { + async fn handle_gc_regions(&self, request: MetaGcRegionsRequest) -> error::Result { + let region_ids = request + .region_ids + .into_iter() + .map(RegionId::from_u64) + .collect::>(); + let report = self + .run_gc_procedure( + region_ids.clone(), + request.full_file_listing, + request.timeout, + ) + .await?; + + Ok(gc_report_to_response(&report, region_ids.len() as u64)) + } + + async fn handle_gc_table(&self, request: MetaGcTableRequest) -> error::Result { + let table_name_key = TableNameKey::new( + &request.catalog_name, + &request.schema_name, + &request.table_name, + ); + + let table_metadata_manager: &TableMetadataManagerRef = self.table_metadata_manager(); + let table_id = table_metadata_manager + .table_name_manager() + .get(table_name_key) + .await + .context(TableMetadataManagerSnafu)? + .context(TableNotFoundSnafu { + name: request.table_name.clone(), + })? + .table_id(); + + let (_phy_table_id, route) = table_metadata_manager + .table_route_manager() + .get_physical_table_route(table_id) + .await + .context(TableMetadataManagerSnafu)?; + + let region_ids = route + .region_routes + .iter() + .map(|r| r.region.id) + .collect::>(); + + let report = self + .run_gc_procedure( + region_ids.clone(), + request.full_file_listing, + request.timeout, + ) + .await?; + + Ok(gc_report_to_response(&report, region_ids.len() as u64)) + } + + async fn run_gc_procedure( + &self, + region_ids: Vec, + full_file_listing: bool, + timeout: Duration, + ) -> error::Result { + let procedure = BatchGcProcedure::new( + self.mailbox().clone(), + self.table_metadata_manager().clone(), + self.options().grpc.server_addr.clone(), + region_ids, + full_file_listing, + timeout, + Region2Peers::new(), + ); + + let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure)); + let mut watcher = self + .procedure_manager() + .submit(procedure_with_id) + .await + .context(error::SubmitProcedureSnafu)?; + + let res = watcher::wait(&mut watcher) + .await + .context(error::WaitProcedureSnafu)? + .with_context(|| error::UnexpectedSnafu { + violated: "GC procedure completed but returned no result".to_string(), + })?; + + BatchGcProcedure::cast_result(res) + } +} + +fn gc_report_to_response( + report: &store_api::storage::GcReport, + processed_regions: u64, +) -> GcResponse { + let deleted_files = report.deleted_files.values().map(|v| v.len() as u64).sum(); + let deleted_indexes = report + .deleted_indexes + .values() + .map(|v| v.len() as u64) + .sum(); + GcResponse { + processed_regions, + need_retry_regions: report + .need_retry_regions + .iter() + .map(|id| id.as_u64()) + .collect(), + deleted_files, + deleted_indexes, + } +} + +fn gc_response_to_regions_pb(resp: GcResponse) -> GcRegionsResponse { + GcRegionsResponse { + processed_regions: resp.processed_regions, + need_retry_regions: resp.need_retry_regions, + deleted_files: resp.deleted_files, + deleted_indexes: resp.deleted_indexes, + ..Default::default() + } +} + +fn gc_response_to_table_pb(resp: GcResponse) -> GcTableResponse { + GcTableResponse { + processed_regions: resp.processed_regions, + need_retry_regions: resp.need_retry_regions, + deleted_files: resp.deleted_files, + deleted_indexes: resp.deleted_indexes, + ..Default::default() + } } diff --git a/src/operator/src/procedure.rs b/src/operator/src/procedure.rs index d6a2fbe03f..d98c09b77c 100644 --- a/src/operator/src/procedure.rs +++ b/src/operator/src/procedure.rs @@ -19,7 +19,9 @@ use common_error::ext::BoxedError; use common_function::handlers::ProcedureServiceHandler; use common_meta::procedure_executor::{ExecutorContext, ProcedureExecutorRef}; use common_meta::rpc::procedure::{ - ManageRegionFollowerRequest, MigrateRegionRequest, ProcedureStateResponse, + GcRegionsRequest as MetaGcRegionsRequest, GcResponse as MetaGcResponse, + GcTableRequest as MetaGcTableRequest, ManageRegionFollowerRequest, MigrateRegionRequest, + ProcedureStateResponse, }; use common_query::error as query_error; use common_query::error::Result as QueryResult; @@ -90,4 +92,20 @@ impl ProcedureServiceHandler for ProcedureServiceOperator { fn catalog_manager(&self) -> &CatalogManagerRef { &self.catalog_manager } + + async fn gc_regions(&self, request: MetaGcRegionsRequest) -> QueryResult { + self.procedure_executor + .gc_regions(&ExecutorContext::default(), request) + .await + .map_err(BoxedError::new) + .context(query_error::ProcedureServiceSnafu) + } + + async fn gc_table(&self, request: MetaGcTableRequest) -> QueryResult { + self.procedure_executor + .gc_table(&ExecutorContext::default(), request) + .await + .map_err(BoxedError::new) + .context(query_error::ProcedureServiceSnafu) + } }