mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-05-15 12:30:38 +00:00
@@ -15,6 +15,7 @@
|
||||
mod build_index_table;
|
||||
mod flush_compact_region;
|
||||
mod flush_compact_table;
|
||||
mod gc;
|
||||
mod migrate_region;
|
||||
mod reconcile_catalog;
|
||||
mod reconcile_database;
|
||||
@@ -22,6 +23,7 @@ mod reconcile_table;
|
||||
|
||||
use flush_compact_region::{CompactRegionFunction, FlushRegionFunction};
|
||||
use flush_compact_table::{CompactTableFunction, FlushTableFunction};
|
||||
use gc::{GcRegionsFunction, GcTableFunction};
|
||||
use migrate_region::MigrateRegionFunction;
|
||||
use reconcile_catalog::ReconcileCatalogFunction;
|
||||
use reconcile_database::ReconcileDatabaseFunction;
|
||||
@@ -42,6 +44,8 @@ impl AdminFunction {
|
||||
registry.register(CompactRegionFunction::factory());
|
||||
registry.register(FlushTableFunction::factory());
|
||||
registry.register(CompactTableFunction::factory());
|
||||
registry.register(GcRegionsFunction::factory());
|
||||
registry.register(GcTableFunction::factory());
|
||||
registry.register(BuildIndexFunction::factory());
|
||||
registry.register(FlushFlowFunction::factory());
|
||||
registry.register(ReconcileCatalogFunction::factory());
|
||||
|
||||
139
src/common/function/src/admin/gc.rs
Normal file
139
src/common/function/src/admin/gc.rs
Normal file
@@ -0,0 +1,139 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::time::Duration;
|
||||
|
||||
use common_error::ext::BoxedError;
|
||||
use common_macro::admin_fn;
|
||||
use common_meta::rpc::procedure::{GcRegionsRequest, GcTableRequest};
|
||||
use common_query::error::{
|
||||
InvalidFuncArgsSnafu, MissingProcedureServiceHandlerSnafu, Result, TableMutationSnafu,
|
||||
UnsupportedInputDataTypeSnafu,
|
||||
};
|
||||
use datafusion_expr::{Signature, TypeSignature, Volatility};
|
||||
use datatypes::arrow::datatypes::DataType as ArrowDataType;
|
||||
use datatypes::prelude::*;
|
||||
use session::context::QueryContextRef;
|
||||
use snafu::{ResultExt, ensure};
|
||||
|
||||
use crate::handlers::ProcedureServiceHandlerRef;
|
||||
use crate::helper::cast_u64;
|
||||
|
||||
const DEFAULT_GC_TIMEOUT: Duration = Duration::from_secs(60);
|
||||
|
||||
#[admin_fn(
|
||||
name = GcRegionsFunction,
|
||||
display_name = gc_regions,
|
||||
sig_fn = gc_regions_signature,
|
||||
ret = uint64
|
||||
)]
|
||||
pub(crate) async fn gc_regions(
|
||||
procedure_service_handler: &ProcedureServiceHandlerRef,
|
||||
_ctx: &QueryContextRef,
|
||||
params: &[ValueRef<'_>],
|
||||
) -> Result<Value> {
|
||||
ensure!(
|
||||
!params.is_empty(),
|
||||
InvalidFuncArgsSnafu {
|
||||
err_msg: "The length of the args is not correct, expect at least 1 region id, have 0"
|
||||
.to_string(),
|
||||
}
|
||||
);
|
||||
|
||||
let mut region_ids = Vec::with_capacity(params.len());
|
||||
for param in params {
|
||||
let Some(region_id) = cast_u64(param)? else {
|
||||
return UnsupportedInputDataTypeSnafu {
|
||||
function: "gc_regions",
|
||||
datatypes: params.iter().map(|v| v.data_type()).collect::<Vec<_>>(),
|
||||
}
|
||||
.fail();
|
||||
};
|
||||
region_ids.push(region_id);
|
||||
}
|
||||
|
||||
let resp = procedure_service_handler
|
||||
.gc_regions(GcRegionsRequest {
|
||||
region_ids,
|
||||
full_file_listing: false,
|
||||
timeout: DEFAULT_GC_TIMEOUT,
|
||||
})
|
||||
.await?;
|
||||
|
||||
Ok(Value::from(resp.processed_regions))
|
||||
}
|
||||
|
||||
#[admin_fn(
|
||||
name = GcTableFunction,
|
||||
display_name = gc_table,
|
||||
sig_fn = gc_table_signature,
|
||||
ret = uint64
|
||||
)]
|
||||
pub(crate) async fn gc_table(
|
||||
procedure_service_handler: &ProcedureServiceHandlerRef,
|
||||
query_ctx: &QueryContextRef,
|
||||
params: &[ValueRef<'_>],
|
||||
) -> Result<Value> {
|
||||
ensure!(
|
||||
params.len() == 1,
|
||||
InvalidFuncArgsSnafu {
|
||||
err_msg: format!(
|
||||
"The length of the args is not correct, expect 1, have: {}",
|
||||
params.len()
|
||||
),
|
||||
}
|
||||
);
|
||||
|
||||
let ValueRef::String(table_name) = params[0] else {
|
||||
return UnsupportedInputDataTypeSnafu {
|
||||
function: "gc_table",
|
||||
datatypes: params.iter().map(|v| v.data_type()).collect::<Vec<_>>(),
|
||||
}
|
||||
.fail();
|
||||
};
|
||||
|
||||
let (catalog_name, schema_name, table_name) =
|
||||
session::table_name::table_name_to_full_name(table_name, query_ctx)
|
||||
.map_err(BoxedError::new)
|
||||
.context(TableMutationSnafu)?;
|
||||
|
||||
let resp = procedure_service_handler
|
||||
.gc_table(GcTableRequest {
|
||||
catalog_name,
|
||||
schema_name,
|
||||
table_name,
|
||||
full_file_listing: false,
|
||||
timeout: DEFAULT_GC_TIMEOUT,
|
||||
})
|
||||
.await?;
|
||||
|
||||
Ok(Value::from(resp.processed_regions))
|
||||
}
|
||||
|
||||
fn gc_regions_signature() -> Signature {
|
||||
Signature::variadic(
|
||||
ConcreteDataType::numerics()
|
||||
.into_iter()
|
||||
.map(|dt| dt.as_arrow_type())
|
||||
.collect(),
|
||||
Volatility::Immutable,
|
||||
)
|
||||
}
|
||||
|
||||
fn gc_table_signature() -> Signature {
|
||||
Signature::one_of(
|
||||
vec![TypeSignature::Uniform(1, vec![ArrowDataType::Utf8])],
|
||||
Volatility::Immutable,
|
||||
)
|
||||
}
|
||||
@@ -19,7 +19,9 @@ use async_trait::async_trait;
|
||||
use catalog::CatalogManagerRef;
|
||||
use common_base::AffectedRows;
|
||||
use common_meta::rpc::procedure::{
|
||||
ManageRegionFollowerRequest, MigrateRegionRequest, ProcedureStateResponse,
|
||||
GcRegionsRequest as MetaGcRegionsRequest, GcResponse as MetaGcResponse,
|
||||
GcTableRequest as MetaGcTableRequest, ManageRegionFollowerRequest, MigrateRegionRequest,
|
||||
ProcedureStateResponse,
|
||||
};
|
||||
use common_query::Output;
|
||||
use common_query::error::Result;
|
||||
@@ -85,6 +87,12 @@ pub trait ProcedureServiceHandler: Send + Sync {
|
||||
|
||||
/// Get the catalog manager
|
||||
fn catalog_manager(&self) -> &CatalogManagerRef;
|
||||
|
||||
/// Manually trigger GC for specific regions.
|
||||
async fn gc_regions(&self, request: MetaGcRegionsRequest) -> Result<MetaGcResponse>;
|
||||
|
||||
/// Manually trigger GC for a table.
|
||||
async fn gc_table(&self, request: MetaGcTableRequest) -> Result<MetaGcResponse>;
|
||||
}
|
||||
|
||||
/// This flow service handler is only use for flush flow for now.
|
||||
|
||||
@@ -37,7 +37,8 @@ impl FunctionState {
|
||||
use catalog::CatalogManagerRef;
|
||||
use common_base::AffectedRows;
|
||||
use common_meta::rpc::procedure::{
|
||||
ManageRegionFollowerRequest, MigrateRegionRequest, ProcedureStateResponse,
|
||||
GcRegionsRequest, GcResponse, GcTableRequest, ManageRegionFollowerRequest,
|
||||
MigrateRegionRequest, ProcedureStateResponse,
|
||||
};
|
||||
use common_query::Output;
|
||||
use common_query::error::Result;
|
||||
@@ -82,6 +83,24 @@ impl FunctionState {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn gc_regions(&self, _request: GcRegionsRequest) -> Result<GcResponse> {
|
||||
Ok(GcResponse {
|
||||
processed_regions: 1,
|
||||
need_retry_regions: vec![],
|
||||
deleted_files: 0,
|
||||
deleted_indexes: 0,
|
||||
})
|
||||
}
|
||||
|
||||
async fn gc_table(&self, _request: GcTableRequest) -> Result<GcResponse> {
|
||||
Ok(GcResponse {
|
||||
processed_regions: 1,
|
||||
need_retry_regions: vec![],
|
||||
deleted_files: 0,
|
||||
deleted_indexes: 0,
|
||||
})
|
||||
}
|
||||
|
||||
fn catalog_manager(&self) -> &CatalogManagerRef {
|
||||
unimplemented!()
|
||||
}
|
||||
|
||||
@@ -25,8 +25,8 @@ use crate::error::{
|
||||
};
|
||||
use crate::rpc::ddl::{SubmitDdlTaskRequest, SubmitDdlTaskResponse};
|
||||
use crate::rpc::procedure::{
|
||||
self, ManageRegionFollowerRequest, MigrateRegionRequest, MigrateRegionResponse,
|
||||
ProcedureStateResponse,
|
||||
self, GcRegionsRequest, GcResponse, GcTableRequest, ManageRegionFollowerRequest,
|
||||
MigrateRegionRequest, MigrateRegionResponse, ProcedureStateResponse,
|
||||
};
|
||||
|
||||
/// The context of procedure executor.
|
||||
@@ -78,6 +78,30 @@ pub trait ProcedureExecutor: Send + Sync {
|
||||
pid: &str,
|
||||
) -> Result<ProcedureStateResponse>;
|
||||
|
||||
/// Manually trigger GC for the specified regions.
|
||||
async fn gc_regions(
|
||||
&self,
|
||||
_ctx: &ExecutorContext,
|
||||
_request: GcRegionsRequest,
|
||||
) -> Result<GcResponse> {
|
||||
UnsupportedSnafu {
|
||||
operation: "gc_regions",
|
||||
}
|
||||
.fail()
|
||||
}
|
||||
|
||||
/// Manually trigger GC for the specified table.
|
||||
async fn gc_table(
|
||||
&self,
|
||||
_ctx: &ExecutorContext,
|
||||
_request: GcTableRequest,
|
||||
) -> Result<GcResponse> {
|
||||
UnsupportedSnafu {
|
||||
operation: "gc_table",
|
||||
}
|
||||
.fail()
|
||||
}
|
||||
|
||||
async fn list_procedures(&self, ctx: &ExecutorContext) -> Result<ProcedureDetailResponse>;
|
||||
}
|
||||
|
||||
|
||||
@@ -78,6 +78,30 @@ pub struct RemoveRegionFollowerRequest {
|
||||
pub peer_id: u64,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct GcRegionsRequest {
|
||||
pub region_ids: Vec<u64>,
|
||||
pub full_file_listing: bool,
|
||||
pub timeout: Duration,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct GcTableRequest {
|
||||
pub catalog_name: String,
|
||||
pub schema_name: String,
|
||||
pub table_name: String,
|
||||
pub full_file_listing: bool,
|
||||
pub timeout: Duration,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Default, PartialEq, Eq)]
|
||||
pub struct GcResponse {
|
||||
pub processed_regions: u64,
|
||||
pub need_retry_regions: Vec<u64>,
|
||||
pub deleted_files: u64,
|
||||
pub deleted_indexes: u64,
|
||||
}
|
||||
|
||||
/// Cast the protobuf [`ProcedureId`] to common [`ProcedureId`].
|
||||
pub fn pb_pid_to_pid(pid: &PbProcedureId) -> Result<ProcedureId> {
|
||||
ProcedureId::parse_str(&String::from_utf8_lossy(&pid.key)).with_context(|_| {
|
||||
|
||||
@@ -48,9 +48,9 @@ use common_meta::range_stream::PaginationStream;
|
||||
use common_meta::rpc::KeyValue;
|
||||
use common_meta::rpc::ddl::{SubmitDdlTaskRequest, SubmitDdlTaskResponse};
|
||||
use common_meta::rpc::procedure::{
|
||||
AddRegionFollowerRequest, AddTableFollowerRequest, ManageRegionFollowerRequest,
|
||||
MigrateRegionRequest, MigrateRegionResponse, ProcedureStateResponse,
|
||||
RemoveRegionFollowerRequest, RemoveTableFollowerRequest,
|
||||
AddRegionFollowerRequest, AddTableFollowerRequest, GcRegionsRequest, GcResponse,
|
||||
GcTableRequest, ManageRegionFollowerRequest, MigrateRegionRequest, MigrateRegionResponse,
|
||||
ProcedureStateResponse, RemoveRegionFollowerRequest, RemoveTableFollowerRequest,
|
||||
};
|
||||
use common_meta::rpc::store::{
|
||||
BatchDeleteRequest, BatchDeleteResponse, BatchGetRequest, BatchGetResponse, BatchPutRequest,
|
||||
@@ -387,6 +387,28 @@ impl ProcedureExecutor for MetaClient {
|
||||
.context(meta_error::ExternalSnafu)
|
||||
}
|
||||
|
||||
async fn gc_regions(
|
||||
&self,
|
||||
_ctx: &ExecutorContext,
|
||||
request: GcRegionsRequest,
|
||||
) -> MetaResult<GcResponse> {
|
||||
self.gc_regions(request)
|
||||
.await
|
||||
.map_err(BoxedError::new)
|
||||
.context(meta_error::ExternalSnafu)
|
||||
}
|
||||
|
||||
async fn gc_table(
|
||||
&self,
|
||||
_ctx: &ExecutorContext,
|
||||
request: GcTableRequest,
|
||||
) -> MetaResult<GcResponse> {
|
||||
self.gc_table(request)
|
||||
.await
|
||||
.map_err(BoxedError::new)
|
||||
.context(meta_error::ExternalSnafu)
|
||||
}
|
||||
|
||||
async fn list_procedures(&self, _ctx: &ExecutorContext) -> MetaResult<ProcedureDetailResponse> {
|
||||
self.procedure_client()
|
||||
.map_err(BoxedError::new)
|
||||
@@ -715,6 +737,16 @@ impl MetaClient {
|
||||
self.procedure_client()?.reconcile(request).await
|
||||
}
|
||||
|
||||
/// Manually trigger GC for specific regions.
|
||||
pub async fn gc_regions(&self, request: GcRegionsRequest) -> Result<GcResponse> {
|
||||
self.procedure_client()?.gc_regions(request).await
|
||||
}
|
||||
|
||||
/// Manually trigger GC for a table (all its regions).
|
||||
pub async fn gc_table(&self, request: GcTableRequest) -> Result<GcResponse> {
|
||||
self.procedure_client()?.gc_table(request).await
|
||||
}
|
||||
|
||||
/// Submit a DDL task
|
||||
pub async fn submit_ddl_task(
|
||||
&self,
|
||||
|
||||
@@ -18,11 +18,16 @@ use std::time::Duration;
|
||||
|
||||
use api::v1::meta::procedure_service_client::ProcedureServiceClient;
|
||||
use api::v1::meta::{
|
||||
DdlTaskRequest, DdlTaskResponse, MigrateRegionRequest, MigrateRegionResponse,
|
||||
ProcedureDetailRequest, ProcedureDetailResponse, ProcedureId, ProcedureStateResponse,
|
||||
QueryProcedureRequest, ReconcileRequest, ReconcileResponse, ResponseHeader, Role,
|
||||
DdlTaskRequest, DdlTaskResponse, GcRegionsRequest, GcRegionsResponse, GcTableRequest,
|
||||
GcTableResponse, MigrateRegionRequest, MigrateRegionResponse, ProcedureDetailRequest,
|
||||
ProcedureDetailResponse, ProcedureId, ProcedureStateResponse, QueryProcedureRequest,
|
||||
ReconcileRequest, ReconcileResponse, RequestHeader, ResponseHeader, Role,
|
||||
};
|
||||
use common_grpc::channel_manager::ChannelManager;
|
||||
use common_meta::rpc::procedure::{
|
||||
GcRegionsRequest as MetaGcRegionsRequest, GcResponse as MetaGcResponse,
|
||||
GcTableRequest as MetaGcTableRequest,
|
||||
};
|
||||
use common_telemetry::tracing_context::TracingContext;
|
||||
use common_telemetry::{error, info, warn};
|
||||
use snafu::{ResultExt, ensure};
|
||||
@@ -105,6 +110,16 @@ impl Client {
|
||||
let inner = self.inner.read().await;
|
||||
inner.list_procedures().await
|
||||
}
|
||||
|
||||
pub async fn gc_regions(&self, request: MetaGcRegionsRequest) -> Result<MetaGcResponse> {
|
||||
let inner = self.inner.read().await;
|
||||
inner.gc_regions(request).await
|
||||
}
|
||||
|
||||
pub async fn gc_table(&self, request: MetaGcTableRequest) -> Result<MetaGcResponse> {
|
||||
let inner = self.inner.read().await;
|
||||
inner.gc_table(request).await
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
@@ -260,6 +275,74 @@ impl Inner {
|
||||
.await
|
||||
}
|
||||
|
||||
async fn gc_regions(&self, request: MetaGcRegionsRequest) -> Result<MetaGcResponse> {
|
||||
let req = GcRegionsRequest {
|
||||
header: Some(RequestHeader {
|
||||
protocol_version: 0,
|
||||
member_id: self.id,
|
||||
role: self.role as i32,
|
||||
tracing_context: TracingContext::from_current_span().to_w3c(),
|
||||
}),
|
||||
region_ids: request.region_ids,
|
||||
full_file_listing: request.full_file_listing,
|
||||
timeout_secs: request.timeout.as_secs() as u32,
|
||||
};
|
||||
|
||||
let resp: GcRegionsResponse = self
|
||||
.with_retry(
|
||||
"gc_regions",
|
||||
move |mut client| {
|
||||
let mut req = Request::new(req.clone());
|
||||
req.set_timeout(self.timeout);
|
||||
async move { client.gc_regions(req).await.map(|res| res.into_inner()) }
|
||||
},
|
||||
|resp: &GcRegionsResponse| &resp.header,
|
||||
)
|
||||
.await?;
|
||||
|
||||
Ok(MetaGcResponse {
|
||||
processed_regions: resp.processed_regions,
|
||||
need_retry_regions: resp.need_retry_regions,
|
||||
deleted_files: resp.deleted_files,
|
||||
deleted_indexes: resp.deleted_indexes,
|
||||
})
|
||||
}
|
||||
|
||||
async fn gc_table(&self, request: MetaGcTableRequest) -> Result<MetaGcResponse> {
|
||||
let req = GcTableRequest {
|
||||
header: Some(RequestHeader {
|
||||
protocol_version: 0,
|
||||
member_id: self.id,
|
||||
role: self.role as i32,
|
||||
tracing_context: TracingContext::from_current_span().to_w3c(),
|
||||
}),
|
||||
catalog_name: request.catalog_name,
|
||||
schema_name: request.schema_name,
|
||||
table_name: request.table_name,
|
||||
full_file_listing: request.full_file_listing,
|
||||
timeout_secs: request.timeout.as_secs() as u32,
|
||||
};
|
||||
|
||||
let resp: GcTableResponse = self
|
||||
.with_retry(
|
||||
"gc_table",
|
||||
move |mut client| {
|
||||
let mut req = Request::new(req.clone());
|
||||
req.set_timeout(self.timeout);
|
||||
async move { client.gc_table(req).await.map(|res| res.into_inner()) }
|
||||
},
|
||||
|resp: &GcTableResponse| &resp.header,
|
||||
)
|
||||
.await?;
|
||||
|
||||
Ok(MetaGcResponse {
|
||||
processed_regions: resp.processed_regions,
|
||||
need_retry_regions: resp.need_retry_regions,
|
||||
deleted_files: resp.deleted_files,
|
||||
deleted_indexes: resp.deleted_indexes,
|
||||
})
|
||||
}
|
||||
|
||||
async fn query_procedure_state(&self, pid: &str) -> Result<ProcedureStateResponse> {
|
||||
let mut req = QueryProcedureRequest {
|
||||
pid: Some(ProcedureId { key: pid.into() }),
|
||||
@@ -333,10 +416,11 @@ mod tests {
|
||||
use api::v1::meta::heartbeat_server::{Heartbeat, HeartbeatServer};
|
||||
use api::v1::meta::procedure_service_server::{ProcedureService, ProcedureServiceServer};
|
||||
use api::v1::meta::{
|
||||
AskLeaderRequest, AskLeaderResponse, DdlTaskRequest, DdlTaskResponse, HeartbeatRequest,
|
||||
HeartbeatResponse, MigrateRegionRequest, MigrateRegionResponse, Peer,
|
||||
ProcedureDetailRequest, ProcedureDetailResponse, ProcedureStateResponse,
|
||||
QueryProcedureRequest, ReconcileRequest, ReconcileResponse, ResponseHeader, Role,
|
||||
AskLeaderRequest, AskLeaderResponse, DdlTaskRequest, DdlTaskResponse, GcRegionsRequest,
|
||||
GcRegionsResponse, GcTableRequest, GcTableResponse, HeartbeatRequest, HeartbeatResponse,
|
||||
MigrateRegionRequest, MigrateRegionResponse, Peer, ProcedureDetailRequest,
|
||||
ProcedureDetailResponse, ProcedureStateResponse, QueryProcedureRequest, ReconcileRequest,
|
||||
ReconcileResponse, ResponseHeader, Role,
|
||||
};
|
||||
use async_trait::async_trait;
|
||||
use common_error::status_code::StatusCode;
|
||||
@@ -435,6 +519,20 @@ mod tests {
|
||||
) -> Result<Response<ProcedureDetailResponse>, Status> {
|
||||
Err(Status::unimplemented("details is not used in this test"))
|
||||
}
|
||||
|
||||
async fn gc_regions(
|
||||
&self,
|
||||
_request: Request<GcRegionsRequest>,
|
||||
) -> Result<Response<GcRegionsResponse>, Status> {
|
||||
Err(Status::unimplemented("gc_regions is not used in this test"))
|
||||
}
|
||||
|
||||
async fn gc_table(
|
||||
&self,
|
||||
_request: Request<GcTableRequest>,
|
||||
) -> Result<Response<GcTableResponse>, Status> {
|
||||
Err(Status::unimplemented("gc_table is not used in this test"))
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread")]
|
||||
|
||||
@@ -16,18 +16,28 @@ use std::time::Duration;
|
||||
|
||||
use api::v1::meta::reconcile_request::Target;
|
||||
use api::v1::meta::{
|
||||
DdlTaskRequest as PbDdlTaskRequest, DdlTaskResponse as PbDdlTaskResponse, MigrateRegionRequest,
|
||||
DdlTaskRequest as PbDdlTaskRequest, DdlTaskResponse as PbDdlTaskResponse, GcRegionsRequest,
|
||||
GcRegionsResponse, GcTableRequest, GcTableResponse, MigrateRegionRequest,
|
||||
MigrateRegionResponse, ProcedureDetailRequest, ProcedureDetailResponse, ProcedureStateResponse,
|
||||
QueryProcedureRequest, ReconcileCatalog, ReconcileDatabase, ReconcileRequest,
|
||||
ReconcileResponse, ReconcileTable, ResolveStrategy, procedure_service_server,
|
||||
};
|
||||
use common_meta::key::TableMetadataManagerRef;
|
||||
use common_meta::key::table_name::TableNameKey;
|
||||
use common_meta::procedure_executor::ExecutorContext;
|
||||
use common_meta::rpc::ddl::{DdlTask, SubmitDdlTaskRequest};
|
||||
use common_meta::rpc::procedure;
|
||||
use common_meta::rpc::procedure::{
|
||||
self, GcRegionsRequest as MetaGcRegionsRequest, GcResponse,
|
||||
GcTableRequest as MetaGcTableRequest,
|
||||
};
|
||||
use common_procedure::{ProcedureWithId, watcher};
|
||||
use snafu::{OptionExt, ResultExt};
|
||||
use store_api::storage::RegionId;
|
||||
use table::table_reference::TableReference;
|
||||
use tonic::Request;
|
||||
|
||||
use crate::error::{TableMetadataManagerSnafu, TableNotFoundSnafu};
|
||||
use crate::gc::{BatchGcProcedure, Region2Peers};
|
||||
use crate::metasrv::Metasrv;
|
||||
use crate::procedure::region_migration::manager::{
|
||||
RegionMigrationProcedureTask, RegionMigrationTriggerReason,
|
||||
@@ -240,4 +250,192 @@ impl procedure_service_server::ProcedureService for Metasrv {
|
||||
metas,
|
||||
)))
|
||||
}
|
||||
|
||||
async fn gc_regions(
|
||||
&self,
|
||||
request: Request<GcRegionsRequest>,
|
||||
) -> GrpcResult<GcRegionsResponse> {
|
||||
check_leader!(self, request, GcRegionsResponse, "`gc_regions`");
|
||||
|
||||
let GcRegionsRequest {
|
||||
header,
|
||||
region_ids,
|
||||
full_file_listing,
|
||||
timeout_secs,
|
||||
} = request.into_inner();
|
||||
|
||||
let _header = header.context(error::MissingRequestHeaderSnafu)?;
|
||||
|
||||
let response = self
|
||||
.handle_gc_regions(MetaGcRegionsRequest {
|
||||
region_ids,
|
||||
full_file_listing,
|
||||
timeout: Duration::from_secs(timeout_secs as u64),
|
||||
})
|
||||
.await?;
|
||||
|
||||
Ok(Response::new(gc_response_to_regions_pb(response)))
|
||||
}
|
||||
|
||||
async fn gc_table(&self, request: Request<GcTableRequest>) -> GrpcResult<GcTableResponse> {
|
||||
check_leader!(self, request, GcTableResponse, "`gc_table`");
|
||||
|
||||
let GcTableRequest {
|
||||
header,
|
||||
catalog_name,
|
||||
schema_name,
|
||||
table_name,
|
||||
full_file_listing,
|
||||
timeout_secs,
|
||||
} = request.into_inner();
|
||||
|
||||
let _header = header.context(error::MissingRequestHeaderSnafu)?;
|
||||
|
||||
let response = self
|
||||
.handle_gc_table(MetaGcTableRequest {
|
||||
catalog_name,
|
||||
schema_name,
|
||||
table_name,
|
||||
full_file_listing,
|
||||
timeout: Duration::from_secs(timeout_secs as u64),
|
||||
})
|
||||
.await?;
|
||||
|
||||
Ok(Response::new(gc_response_to_table_pb(response)))
|
||||
}
|
||||
}
|
||||
|
||||
impl Metasrv {
|
||||
async fn handle_gc_regions(&self, request: MetaGcRegionsRequest) -> error::Result<GcResponse> {
|
||||
let region_ids = request
|
||||
.region_ids
|
||||
.into_iter()
|
||||
.map(RegionId::from_u64)
|
||||
.collect::<Vec<_>>();
|
||||
let report = self
|
||||
.run_gc_procedure(
|
||||
region_ids.clone(),
|
||||
request.full_file_listing,
|
||||
request.timeout,
|
||||
)
|
||||
.await?;
|
||||
|
||||
Ok(gc_report_to_response(&report, region_ids.len() as u64))
|
||||
}
|
||||
|
||||
async fn handle_gc_table(&self, request: MetaGcTableRequest) -> error::Result<GcResponse> {
|
||||
let table_name_key = TableNameKey::new(
|
||||
&request.catalog_name,
|
||||
&request.schema_name,
|
||||
&request.table_name,
|
||||
);
|
||||
|
||||
let table_metadata_manager: &TableMetadataManagerRef = self.table_metadata_manager();
|
||||
let table_id = table_metadata_manager
|
||||
.table_name_manager()
|
||||
.get(table_name_key)
|
||||
.await
|
||||
.context(TableMetadataManagerSnafu)?
|
||||
.context(TableNotFoundSnafu {
|
||||
name: request.table_name.clone(),
|
||||
})?
|
||||
.table_id();
|
||||
|
||||
let (_phy_table_id, route) = table_metadata_manager
|
||||
.table_route_manager()
|
||||
.get_physical_table_route(table_id)
|
||||
.await
|
||||
.context(TableMetadataManagerSnafu)?;
|
||||
|
||||
let region_ids = route
|
||||
.region_routes
|
||||
.iter()
|
||||
.map(|r| r.region.id)
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
let report = self
|
||||
.run_gc_procedure(
|
||||
region_ids.clone(),
|
||||
request.full_file_listing,
|
||||
request.timeout,
|
||||
)
|
||||
.await?;
|
||||
|
||||
Ok(gc_report_to_response(&report, region_ids.len() as u64))
|
||||
}
|
||||
|
||||
async fn run_gc_procedure(
|
||||
&self,
|
||||
region_ids: Vec<RegionId>,
|
||||
full_file_listing: bool,
|
||||
timeout: Duration,
|
||||
) -> error::Result<store_api::storage::GcReport> {
|
||||
let procedure = BatchGcProcedure::new(
|
||||
self.mailbox().clone(),
|
||||
self.table_metadata_manager().clone(),
|
||||
self.options().grpc.server_addr.clone(),
|
||||
region_ids,
|
||||
full_file_listing,
|
||||
timeout,
|
||||
Region2Peers::new(),
|
||||
);
|
||||
|
||||
let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
|
||||
let mut watcher = self
|
||||
.procedure_manager()
|
||||
.submit(procedure_with_id)
|
||||
.await
|
||||
.context(error::SubmitProcedureSnafu)?;
|
||||
|
||||
let res = watcher::wait(&mut watcher)
|
||||
.await
|
||||
.context(error::WaitProcedureSnafu)?
|
||||
.with_context(|| error::UnexpectedSnafu {
|
||||
violated: "GC procedure completed but returned no result".to_string(),
|
||||
})?;
|
||||
|
||||
BatchGcProcedure::cast_result(res)
|
||||
}
|
||||
}
|
||||
|
||||
fn gc_report_to_response(
|
||||
report: &store_api::storage::GcReport,
|
||||
processed_regions: u64,
|
||||
) -> GcResponse {
|
||||
let deleted_files = report.deleted_files.values().map(|v| v.len() as u64).sum();
|
||||
let deleted_indexes = report
|
||||
.deleted_indexes
|
||||
.values()
|
||||
.map(|v| v.len() as u64)
|
||||
.sum();
|
||||
GcResponse {
|
||||
processed_regions,
|
||||
need_retry_regions: report
|
||||
.need_retry_regions
|
||||
.iter()
|
||||
.map(|id| id.as_u64())
|
||||
.collect(),
|
||||
deleted_files,
|
||||
deleted_indexes,
|
||||
}
|
||||
}
|
||||
|
||||
fn gc_response_to_regions_pb(resp: GcResponse) -> GcRegionsResponse {
|
||||
GcRegionsResponse {
|
||||
processed_regions: resp.processed_regions,
|
||||
need_retry_regions: resp.need_retry_regions,
|
||||
deleted_files: resp.deleted_files,
|
||||
deleted_indexes: resp.deleted_indexes,
|
||||
..Default::default()
|
||||
}
|
||||
}
|
||||
|
||||
fn gc_response_to_table_pb(resp: GcResponse) -> GcTableResponse {
|
||||
GcTableResponse {
|
||||
processed_regions: resp.processed_regions,
|
||||
need_retry_regions: resp.need_retry_regions,
|
||||
deleted_files: resp.deleted_files,
|
||||
deleted_indexes: resp.deleted_indexes,
|
||||
..Default::default()
|
||||
}
|
||||
}
|
||||
|
||||
@@ -19,7 +19,9 @@ use common_error::ext::BoxedError;
|
||||
use common_function::handlers::ProcedureServiceHandler;
|
||||
use common_meta::procedure_executor::{ExecutorContext, ProcedureExecutorRef};
|
||||
use common_meta::rpc::procedure::{
|
||||
ManageRegionFollowerRequest, MigrateRegionRequest, ProcedureStateResponse,
|
||||
GcRegionsRequest as MetaGcRegionsRequest, GcResponse as MetaGcResponse,
|
||||
GcTableRequest as MetaGcTableRequest, ManageRegionFollowerRequest, MigrateRegionRequest,
|
||||
ProcedureStateResponse,
|
||||
};
|
||||
use common_query::error as query_error;
|
||||
use common_query::error::Result as QueryResult;
|
||||
@@ -90,4 +92,20 @@ impl ProcedureServiceHandler for ProcedureServiceOperator {
|
||||
fn catalog_manager(&self) -> &CatalogManagerRef {
|
||||
&self.catalog_manager
|
||||
}
|
||||
|
||||
async fn gc_regions(&self, request: MetaGcRegionsRequest) -> QueryResult<MetaGcResponse> {
|
||||
self.procedure_executor
|
||||
.gc_regions(&ExecutorContext::default(), request)
|
||||
.await
|
||||
.map_err(BoxedError::new)
|
||||
.context(query_error::ProcedureServiceSnafu)
|
||||
}
|
||||
|
||||
async fn gc_table(&self, request: MetaGcTableRequest) -> QueryResult<MetaGcResponse> {
|
||||
self.procedure_executor
|
||||
.gc_table(&ExecutorContext::default(), request)
|
||||
.await
|
||||
.map_err(BoxedError::new)
|
||||
.context(query_error::ProcedureServiceSnafu)
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user