feat: gc scheduler ctx&procedure (#7252)

* feat: gc ctx&procedure

Signed-off-by: discord9 <discord9@163.com>

* fix: handle region not found case

Signed-off-by: discord9 <discord9@163.com>

* docs: more explain&todo

Signed-off-by: discord9 <discord9@163.com>

* per review

Signed-off-by: discord9 <discord9@163.com>

* chore: add time for region gc

Signed-off-by: discord9 <discord9@163.com>

* fix: explain why loader for gc region should fail

Signed-off-by: discord9 <discord9@163.com>

---------

Signed-off-by: discord9 <discord9@163.com>
This commit is contained in:
discord9
2025-11-19 16:35:17 +08:00
committed by GitHub
parent 5d8819e7af
commit e59612043d
16 changed files with 709 additions and 80 deletions

1
Cargo.lock generated
View File

@@ -7440,6 +7440,7 @@ dependencies = [
"lazy_static",
"local-ip-address",
"once_cell",
"ordered-float 4.6.0",
"parking_lot 0.12.4",
"prometheus",
"prost 0.13.5",

View File

@@ -246,6 +246,14 @@ pub enum Error {
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Loader for {type_name} is not implemented: {reason}"))]
ProcedureLoaderNotImplemented {
#[snafu(implicit)]
location: Location,
type_name: String,
reason: String,
},
}
pub type Result<T> = std::result::Result<T, Error>;
@@ -266,7 +274,8 @@ impl ErrorExt for Error {
Error::ToJson { .. }
| Error::DeleteState { .. }
| Error::FromJson { .. }
| Error::WaitWatcher { .. } => StatusCode::Internal,
| Error::WaitWatcher { .. }
| Error::ProcedureLoaderNotImplemented { .. } => StatusCode::Internal,
Error::RetryTimesExceeded { .. }
| Error::RollbackTimesExceeded { .. }

View File

@@ -40,7 +40,7 @@ impl InstructionHandler for GetFileRefsHandler {
}));
};
match mito_engine
.get_snapshot_of_unmanifested_refs(get_file_refs.region_ids)
.get_snapshot_of_file_refs(get_file_refs.region_ids)
.await
{
Ok(all_file_refs) => {

View File

@@ -101,20 +101,6 @@ impl GcRegionsHandler {
// always use the smallest region id on datanode as the target region id
region_ids.sort_by_key(|r| r.region_number());
ensure!(
region_ids.windows(2).all(|w| {
let t1 = w[0].table_id();
let t2 = w[1].table_id();
t1 == t2
}),
InvalidGcArgsSnafu {
msg: format!(
"Regions to GC should belong to the same table, found: {:?}",
region_ids
),
}
);
let mito_engine = ctx
.region_server
.mito_engine()
@@ -153,17 +139,14 @@ impl GcRegionsHandler {
})?
.access_layer();
// if region happen to be dropped before this but after gc scheduler send gc instr,
// need to deal with it properly(it is ok for region to be dropped after GC worker started)
// region not found here can only be drop table/database case, since region migration is prevented by lock in gc procedure
// TODO(discord9): add integration test for this drop case
let mito_regions = region_ids
.iter()
.map(|rid| {
mito_engine
.find_region(*rid)
.map(|r| (*rid, r))
.with_context(|| InvalidGcArgsSnafu {
msg: format!("Region {} not found on datanode", rid),
})
})
.collect::<Result<_>>()?;
.filter_map(|rid| mito_engine.find_region(*rid).map(|r| (*rid, r)))
.collect();
let cache_manager = mito_engine.cache_manager();

View File

@@ -62,6 +62,7 @@ hyper-util = { workspace = true, features = ["tokio"] }
itertools.workspace = true
lazy_static.workspace = true
once_cell.workspace = true
ordered-float.workspace = true
parking_lot.workspace = true
prometheus.workspace = true
prost.workspace = true

View File

@@ -987,8 +987,12 @@ pub enum Error {
impl Error {
/// Returns `true` if the error is retryable.
pub fn is_retryable(&self) -> bool {
matches!(self, Error::RetryLater { .. })
|| matches!(self, Error::RetryLaterWithSource { .. })
matches!(
self,
Error::RetryLater { .. }
| Error::RetryLaterWithSource { .. }
| Error::MailboxTimeout { .. }
)
}
}

29
src/meta-srv/src/gc.rs Normal file
View File

@@ -0,0 +1,29 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// TODO(discord9): remove this once gc scheduler is fully merged
#![allow(unused)]
use std::collections::{HashMap, HashSet};
use common_meta::peer::Peer;
use store_api::storage::RegionId;
mod candidate;
mod ctx;
mod procedure;
pub(crate) type Region2Peers = HashMap<RegionId, (Peer, Vec<Peer>)>;
pub(crate) type Peer2Regions = HashMap<Peer, HashSet<RegionId>>;

View File

@@ -0,0 +1,48 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashMap;
use std::time::Instant;
use common_meta::datanode::{RegionManifestInfo, RegionStat};
use common_telemetry::{debug, info};
use ordered_float::OrderedFloat;
use store_api::region_engine::RegionRole;
use store_api::storage::RegionId;
use table::metadata::TableId;
use crate::error::Result;
/// Represents a region candidate for GC with its priority score.
#[derive(Debug, Clone, PartialEq, Eq)]
pub(crate) struct GcCandidate {
pub(crate) region_id: RegionId,
pub(crate) score: OrderedFloat<f64>,
pub(crate) region_stat: RegionStat,
}
impl GcCandidate {
fn new(region_id: RegionId, score: f64, region_stat: RegionStat) -> Self {
Self {
region_id,
score: OrderedFloat(score),
region_stat,
}
}
#[allow(unused)]
fn score_f64(&self) -> f64 {
self.score.into_inner()
}
}

384
src/meta-srv/src/gc/ctx.rs Normal file
View File

@@ -0,0 +1,384 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// TODO(discord9): remove this once gc scheduler is fully merged
#![allow(unused)]
use std::collections::{HashMap, HashSet};
use std::time::Duration;
use api::v1::meta::MailboxMessage;
use common_meta::datanode::RegionStat;
use common_meta::instruction::{
GcRegions, GetFileRefs, GetFileRefsReply, Instruction, InstructionReply,
};
use common_meta::key::TableMetadataManagerRef;
use common_meta::key::table_route::PhysicalTableRouteValue;
use common_meta::peer::Peer;
use common_procedure::{ProcedureManagerRef, ProcedureWithId, watcher};
use common_telemetry::{debug, error, warn};
use snafu::{OptionExt as _, ResultExt as _};
use store_api::storage::{FileId, FileRefsManifest, GcReport, RegionId};
use table::metadata::TableId;
use crate::cluster::MetaPeerClientRef;
use crate::error::{self, Result, TableMetadataManagerSnafu, UnexpectedSnafu};
use crate::gc::Region2Peers;
use crate::gc::procedure::GcRegionProcedure;
use crate::handler::HeartbeatMailbox;
use crate::service::mailbox::{Channel, MailboxRef};
#[async_trait::async_trait]
pub(crate) trait SchedulerCtx: Send + Sync {
async fn get_table_to_region_stats(&self) -> Result<HashMap<TableId, Vec<RegionStat>>>;
async fn get_table_route(
&self,
table_id: TableId,
) -> Result<(TableId, PhysicalTableRouteValue)>;
async fn get_file_references(
&self,
region_ids: &[RegionId],
region_routes: &Region2Peers,
timeout: Duration,
) -> Result<FileRefsManifest>;
async fn gc_regions(
&self,
peer: Peer,
region_ids: &[RegionId],
file_refs_manifest: &FileRefsManifest,
full_file_listing: bool,
timeout: Duration,
) -> Result<GcReport>;
}
pub(crate) struct DefaultGcSchedulerCtx {
/// The metadata manager.
pub(crate) table_metadata_manager: TableMetadataManagerRef,
/// Procedure manager.
pub(crate) procedure_manager: ProcedureManagerRef,
/// For getting `RegionStats`.
pub(crate) meta_peer_client: MetaPeerClientRef,
/// The mailbox to send messages.
pub(crate) mailbox: MailboxRef,
/// The server address.
pub(crate) server_addr: String,
}
impl DefaultGcSchedulerCtx {
pub fn try_new(
table_metadata_manager: TableMetadataManagerRef,
procedure_manager: ProcedureManagerRef,
meta_peer_client: MetaPeerClientRef,
mailbox: MailboxRef,
server_addr: String,
) -> Result<Self> {
// register a noop loader for `GcRegionProcedure` to avoid error when deserializing procedure when rebooting
procedure_manager
.register_loader(
GcRegionProcedure::TYPE_NAME,
Box::new(move |json| {
common_procedure::error::ProcedureLoaderNotImplementedSnafu {
type_name: GcRegionProcedure::TYPE_NAME.to_string(),
reason:
"GC procedure should be retried by scheduler, not reloaded from storage"
.to_string(),
}
.fail()
}),
)
.context(error::RegisterProcedureLoaderSnafu {
type_name: GcRegionProcedure::TYPE_NAME,
});
Ok(Self {
table_metadata_manager,
procedure_manager,
meta_peer_client,
mailbox,
server_addr,
})
}
}
#[async_trait::async_trait]
impl SchedulerCtx for DefaultGcSchedulerCtx {
async fn get_table_to_region_stats(&self) -> Result<HashMap<TableId, Vec<RegionStat>>> {
let dn_stats = self.meta_peer_client.get_all_dn_stat_kvs().await?;
let mut table_to_region_stats: HashMap<TableId, Vec<RegionStat>> = HashMap::new();
for (_dn_id, stats) in dn_stats {
let mut stats = stats.stats;
let Some(latest_stat) = stats.iter().max_by_key(|s| s.timestamp_millis).cloned() else {
continue;
};
for region_stat in latest_stat.region_stats {
table_to_region_stats
.entry(region_stat.id.table_id())
.or_default()
.push(region_stat);
}
}
Ok(table_to_region_stats)
}
async fn get_table_route(
&self,
table_id: TableId,
) -> Result<(TableId, PhysicalTableRouteValue)> {
self.table_metadata_manager
.table_route_manager()
.get_physical_table_route(table_id)
.await
.context(TableMetadataManagerSnafu)
}
async fn gc_regions(
&self,
peer: Peer,
region_ids: &[RegionId],
file_refs_manifest: &FileRefsManifest,
full_file_listing: bool,
timeout: Duration,
) -> Result<GcReport> {
self.gc_regions_inner(
peer,
region_ids,
file_refs_manifest,
full_file_listing,
timeout,
)
.await
}
async fn get_file_references(
&self,
region_ids: &[RegionId],
region_routes: &Region2Peers,
timeout: Duration,
) -> Result<FileRefsManifest> {
debug!("Getting file references for {} regions", region_ids.len());
// Group regions by datanode to minimize RPC calls
let mut datanode_regions: HashMap<Peer, Vec<RegionId>> = HashMap::new();
for region_id in region_ids {
if let Some((leader, followers)) = region_routes.get(region_id) {
datanode_regions
.entry(leader.clone())
.or_default()
.push(*region_id);
// also need to send for follower regions for file refs in case query is running on follower
for follower in followers {
datanode_regions
.entry(follower.clone())
.or_default()
.push(*region_id);
}
} else {
return error::UnexpectedSnafu {
violated: format!(
"region_routes: {region_routes:?} does not contain region_id: {region_id}",
),
}
.fail();
}
}
// Send GetFileRefs instructions to each datanode
let mut all_file_refs: HashMap<RegionId, HashSet<FileId>> = HashMap::new();
let mut all_manifest_versions = HashMap::new();
for (peer, regions) in datanode_regions {
match self
.send_get_file_refs_instruction(&peer, &regions, timeout)
.await
{
Ok(manifest) => {
// TODO(discord9): if other regions provide file refs for one region on other datanode, and no version,
// is it correct to merge manifest_version directly?
// FIXME: follower region how to merge version???
for (region_id, file_refs) in manifest.file_refs {
all_file_refs
.entry(region_id)
.or_default()
.extend(file_refs);
}
// region manifest version should be the smallest one among all peers, so outdated region can be detected
for (region_id, version) in manifest.manifest_version {
let entry = all_manifest_versions.entry(region_id).or_insert(version);
*entry = (*entry).min(version);
}
}
Err(e) => {
warn!(
"Failed to get file refs from datanode {}: {}. Skipping regions on this datanode.",
peer, e
);
// Continue processing other datanodes instead of failing the entire operation
continue;
}
}
}
Ok(FileRefsManifest {
file_refs: all_file_refs,
manifest_version: all_manifest_versions,
})
}
}
impl DefaultGcSchedulerCtx {
async fn gc_regions_inner(
&self,
peer: Peer,
region_ids: &[RegionId],
file_refs_manifest: &FileRefsManifest,
full_file_listing: bool,
timeout: Duration,
) -> Result<GcReport> {
debug!(
"Sending GC instruction to datanode {} for {} regions (full_file_listing: {})",
peer,
region_ids.len(),
full_file_listing
);
let gc_regions = GcRegions {
regions: region_ids.to_vec(),
file_refs_manifest: file_refs_manifest.clone(),
full_file_listing,
};
let procedure = GcRegionProcedure::new(
self.mailbox.clone(),
self.server_addr.clone(),
peer,
gc_regions,
format!("GC for {} regions", region_ids.len()),
timeout,
);
let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
let id = procedure_with_id.id;
let mut watcher = self
.procedure_manager
.submit(procedure_with_id)
.await
.context(error::SubmitProcedureSnafu)?;
let res = watcher::wait(&mut watcher)
.await
.context(error::WaitProcedureSnafu)?
.with_context(|| error::UnexpectedSnafu {
violated: format!(
"GC procedure {id} successfully completed but no result returned"
),
})?;
let gc_report = GcRegionProcedure::cast_result(res)?;
Ok(gc_report)
}
/// TODO(discord9): add support to read manifest of related regions for file refs too
/// (now it's only reading active FileHandles)
async fn send_get_file_refs_instruction(
&self,
peer: &Peer,
region_ids: &[RegionId],
timeout: Duration,
) -> Result<FileRefsManifest> {
debug!(
"Sending GetFileRefs instruction to datanode {} for {} regions",
peer,
region_ids.len()
);
let instruction = Instruction::GetFileRefs(GetFileRefs {
region_ids: region_ids.to_vec(),
});
let reply = self
.send_instruction(peer, instruction, "Get file references", timeout)
.await?;
let InstructionReply::GetFileRefs(GetFileRefsReply {
file_refs_manifest,
success,
error,
}) = reply
else {
return error::UnexpectedInstructionReplySnafu {
mailbox_message: format!("{:?}", reply),
reason: "Unexpected reply of the GetFileRefs instruction",
}
.fail();
};
if !success {
return error::UnexpectedSnafu {
violated: format!(
"Failed to get file references from datanode {}: {:?}",
peer, error
),
}
.fail();
}
Ok(file_refs_manifest)
}
async fn send_instruction(
&self,
peer: &Peer,
instruction: Instruction,
description: &str,
timeout: Duration,
) -> Result<InstructionReply> {
let msg = MailboxMessage::json_message(
&format!("{}: {}", description, instruction),
&format!("Metasrv@{}", self.server_addr),
&format!("Datanode-{}@{}", peer.id, peer.addr),
common_time::util::current_time_millis(),
&instruction,
)
.with_context(|_| error::SerializeToJsonSnafu {
input: instruction.to_string(),
})?;
let mailbox_rx = self
.mailbox
.send(&Channel::Datanode(peer.id), msg, timeout)
.await?;
match mailbox_rx.await {
Ok(reply_msg) => {
let reply = HeartbeatMailbox::json_reply(&reply_msg)?;
Ok(reply)
}
Err(e) => {
error!(
"Failed to receive reply from datanode {} for {}: {}",
peer, description, e
);
Err(e)
}
}
}
}

View File

@@ -0,0 +1,184 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::any::Any;
use std::sync::Arc;
use std::time::Duration;
use api::v1::meta::MailboxMessage;
use common_meta::instruction::{self, GcRegions, InstructionReply};
use common_meta::lock_key::RegionLock;
use common_meta::peer::Peer;
use common_procedure::error::ToJsonSnafu;
use common_procedure::{
Context as ProcedureContext, Error as ProcedureError, LockKey, Procedure,
Result as ProcedureResult, Status,
};
use common_telemetry::error;
use itertools::Itertools as _;
use serde::{Deserialize, Serialize};
use snafu::ResultExt as _;
use store_api::storage::GcReport;
use crate::error::{self, Result, SerializeToJsonSnafu};
use crate::handler::HeartbeatMailbox;
use crate::service::mailbox::{Channel, MailboxRef};
/// TODO(discord9): another procedure which do both get file refs and gc regions.
pub struct GcRegionProcedure {
mailbox: MailboxRef,
data: GcRegionData,
}
#[derive(Serialize, Deserialize)]
pub struct GcRegionData {
server_addr: String,
peer: Peer,
gc_regions: GcRegions,
description: String,
timeout: Duration,
}
impl GcRegionProcedure {
pub const TYPE_NAME: &'static str = "metasrv-procedure::GcRegionProcedure";
pub fn new(
mailbox: MailboxRef,
server_addr: String,
peer: Peer,
gc_regions: GcRegions,
description: String,
timeout: Duration,
) -> Self {
Self {
mailbox,
data: GcRegionData {
peer,
server_addr,
gc_regions,
description,
timeout,
},
}
}
async fn send_gc_instr(&self) -> Result<GcReport> {
let peer = &self.data.peer;
let instruction = instruction::Instruction::GcRegions(self.data.gc_regions.clone());
let msg = MailboxMessage::json_message(
&format!("{}: {}", self.data.description, instruction),
&format!("Metasrv@{}", self.data.server_addr),
&format!("Datanode-{}@{}", peer.id, peer.addr),
common_time::util::current_time_millis(),
&instruction,
)
.with_context(|_| SerializeToJsonSnafu {
input: instruction.to_string(),
})?;
let mailbox_rx = self
.mailbox
.send(&Channel::Datanode(peer.id), msg, self.data.timeout)
.await?;
let reply = match mailbox_rx.await {
Ok(reply_msg) => HeartbeatMailbox::json_reply(&reply_msg)?,
Err(e) => {
error!(
"Failed to receive reply from datanode {} for {}: {}",
peer, self.data.description, e
);
return Err(e);
}
};
let InstructionReply::GcRegions(reply) = reply else {
return error::UnexpectedInstructionReplySnafu {
mailbox_message: format!("{:?}", reply),
reason: "Unexpected reply of the GcRegions instruction",
}
.fail();
};
let res = reply.result;
match res {
Ok(report) => Ok(report),
Err(e) => {
error!(
"Datanode {} reported error during GC for regions {:?}: {}",
peer, self.data.gc_regions, e
);
Err(error::UnexpectedSnafu {
violated: format!(
"Datanode {} reported error during GC for regions {:?}: {}",
peer, self.data.gc_regions, e
),
}
.fail()?)
}
}
}
pub fn cast_result(res: Arc<dyn Any>) -> Result<GcReport> {
res.downcast_ref::<GcReport>().cloned().ok_or_else(|| {
error::UnexpectedSnafu {
violated: format!(
"Failed to downcast procedure result to GcReport, got {:?}",
std::any::type_name_of_val(&res.as_ref())
),
}
.build()
})
}
}
#[async_trait::async_trait]
impl Procedure for GcRegionProcedure {
fn type_name(&self) -> &str {
Self::TYPE_NAME
}
async fn execute(&mut self, _ctx: &ProcedureContext) -> ProcedureResult<Status> {
// Send GC instruction to the datanode. This procedure only handle lock&send, results or other kind of
// errors will be reported back via the oneshot channel.
let reply = self
.send_gc_instr()
.await
.map_err(ProcedureError::external)?;
Ok(Status::done_with_output(reply))
}
fn dump(&self) -> ProcedureResult<String> {
serde_json::to_string(&self.data).context(ToJsonSnafu)
}
/// Write lock all regions involved in this GC procedure.
/// So i.e. region migration won't happen during GC and cause race conditions.
///
/// only write lock the regions not catatlog/schema because it can run concurrently with other procedures(i.e. drop database/table)
/// TODO:(discord9): integration test to verify this
fn lock_key(&self) -> LockKey {
let lock_key: Vec<_> = self
.data
.gc_regions
.regions
.iter()
.sorted() // sort to have a deterministic lock order
.map(|id| RegionLock::Read(*id).into())
.collect();
LockKey::new(lock_key)
}
}

View File

@@ -25,6 +25,7 @@ pub mod election;
pub mod error;
pub mod events;
mod failure_detector;
pub mod gc;
pub mod handler;
pub mod key;
pub mod metasrv;

View File

@@ -290,7 +290,7 @@ impl MitoEngine {
}
/// Get all tmp ref files for given region ids, excluding files that's already in manifest.
pub async fn get_snapshot_of_unmanifested_refs(
pub async fn get_snapshot_of_file_refs(
&self,
region_ids: impl IntoIterator<Item = RegionId>,
) -> Result<FileRefsManifest> {
@@ -304,9 +304,7 @@ impl MitoEngine {
.filter_map(|region_id| self.find_region(region_id))
.collect();
file_ref_mgr
.get_snapshot_of_unmanifested_refs(regions)
.await
file_ref_mgr.get_snapshot_of_file_refs(regions).await
}
/// Returns true if the specific region exists.

View File

@@ -263,6 +263,7 @@ impl LocalGcWorker {
let mut deleted_files = HashMap::new();
let tmp_ref_files = self.read_tmp_ref_files(&mut outdated_regions).await?;
for (region_id, region) in &self.regions {
let per_region_time = std::time::Instant::now();
if region.manifest_ctx.current_state() == RegionRoleState::Follower {
return UnexpectedSnafu {
reason: format!(
@@ -278,10 +279,15 @@ impl LocalGcWorker {
.unwrap_or_else(HashSet::new);
let files = self.do_region_gc(region.clone(), &tmp_ref_files).await?;
deleted_files.insert(*region_id, files);
debug!(
"GC for region {} took {} secs.",
region_id,
per_region_time.elapsed().as_secs_f32()
);
}
info!(
"LocalGcWorker finished after {} secs.",
now.elapsed().as_secs()
now.elapsed().as_secs_f32()
);
let report = GcReport {
deleted_files,
@@ -323,31 +329,34 @@ impl LocalGcWorker {
debug!("No recently removed files to gc for region {}", region_id);
}
debug!(
"Found {} recently removed files sets for region {}",
recently_removed_files.len(),
region_id
);
let removed_file_cnt = recently_removed_files
.values()
.map(|s| s.len())
.sum::<usize>();
let concurrency = (current_files.len() / Self::CONCURRENCY_LIST_PER_FILES)
.max(1)
.min(self.opt.max_concurrent_lister_per_gc_job);
let in_used = current_files
let in_used: HashSet<FileId> = current_files
.keys()
.cloned()
.chain(tmp_ref_files.clone().into_iter())
.collect();
let unused_files = self
.list_to_be_deleted_files(region_id, in_used, recently_removed_files, concurrency)
.list_to_be_deleted_files(region_id, &in_used, recently_removed_files, concurrency)
.await?;
let unused_len = unused_files.len();
let unused_file_cnt = unused_files.len();
debug!(
"Found {} unused files to delete for region {}",
unused_len, region_id
"gc: for region {region_id}: In manifest files: {}, Tmp ref file cnt: {}, In-used files: {}, recently removed files: {}, Unused files to delete: {} ",
current_files.len(),
tmp_ref_files.len(),
in_used.len(),
removed_file_cnt,
unused_files.len()
);
// TODO(discord9): for now, ignore async index file as it's design is not stable, need to be improved once
@@ -357,7 +366,7 @@ impl LocalGcWorker {
.map(|file_id| (*file_id, *file_id))
.collect();
info!(
debug!(
"Found {} unused index files to delete for region {}",
file_pairs.len(),
region_id
@@ -367,7 +376,7 @@ impl LocalGcWorker {
debug!(
"Successfully deleted {} unused files for region {}",
unused_len, region_id
unused_file_cnt, region_id
);
// TODO(discord9): update region manifest about deleted files
self.update_manifest_removed_files(&region, unused_files.clone())
@@ -398,8 +407,9 @@ impl LocalGcWorker {
region: &MitoRegionRef,
deleted_files: Vec<FileId>,
) -> Result<()> {
let deleted_file_cnt = deleted_files.len();
debug!(
"Trying to update manifest removed files for region {}",
"Trying to update manifest for {deleted_file_cnt} removed files for region {}",
region.region_id()
);
@@ -592,7 +602,7 @@ impl LocalGcWorker {
pub async fn list_to_be_deleted_files(
&self,
region_id: RegionId,
in_used: HashSet<FileId>,
in_used: &HashSet<FileId>,
recently_removed_files: BTreeMap<Timestamp, HashSet<FileId>>,
concurrency: usize,
) -> Result<Vec<FileId>> {

View File

@@ -21,7 +21,7 @@ use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
use common_base::readable_size::ReadableSize;
use common_telemetry::{error, info};
use common_telemetry::{debug, error};
use common_time::Timestamp;
use partition::expr::PartitionExpr;
use serde::{Deserialize, Serialize};
@@ -436,7 +436,7 @@ pub async fn delete_files(
}
}
info!(
debug!(
"Deleted {} files for region {}: {:?}",
deleted_files.len(),
region_id,

View File

@@ -80,56 +80,30 @@ impl FileReferenceManager {
Some(ref_file_set)
}
/// Gets all ref files for the given table id, excluding those already in region manifest.
///
/// It's safe if manifest version became outdated when gc worker is called, as gc worker will check the changes between those two versions and act accordingly to make sure to get the real truly tmp ref file sets at the time of old manifest version.
///
/// TODO(discord9): Since query will only possible refer to files in latest manifest when it's started, the only true risks is files removed from manifest between old version(when reading refs) and new version(at gc worker), so in case of having outdated manifest version, gc worker should make sure not to delete those files(Until next gc round which will use the latest manifest version and handle those files normally).
/// or perhaps using a two-phase commit style process where it proposes a set of files for deletion and then verifies no new references have appeared before committing the delete.
///
/// gc worker could do this:
/// 1. if can get the files that got removed from old manifest to new manifest, then shouldn't delete those files even if they are not in tmp ref file, other files can be normally handled(deleted if not in use, otherwise keep)
/// and report back allow next gc round to handle those files with newer tmp ref file sets.
/// 2. if can't get the files that got removed from old manifest to new manifest(possible if just did a checkpoint),
/// then can do nothing as can't sure whether a file is truly unused or just tmp ref file sets haven't report it, so need to report back and try next gc round to handle those files with newer tmp ref file sets.
///
/// Gets all ref files for the given regions, meaning all open FileHandles for those regions
#[allow(unused)]
pub(crate) async fn get_snapshot_of_unmanifested_refs(
pub(crate) async fn get_snapshot_of_file_refs(
&self,
regions: Vec<MitoRegionRef>,
) -> Result<FileRefsManifest> {
let mut ref_files = HashMap::new();
for region_id in regions.iter().map(|r| r.region_id()) {
if let Some(files) = self.ref_file_set(region_id) {
ref_files.insert(region_id, files);
ref_files.insert(region_id, files.into_iter().map(|f| f.file_id).collect());
}
}
let mut in_manifest_files = HashSet::new();
let mut manifest_version = HashMap::new();
for r in &regions {
let manifest = r.manifest_ctx.manifest().await;
let files = manifest.files.keys().cloned().collect::<Vec<_>>();
in_manifest_files.extend(files);
manifest_version.insert(r.region_id(), manifest.manifest_version);
}
let ref_files_excluding_in_manifest = ref_files
.iter()
.map(|(r, f)| {
(
*r,
f.iter()
.filter_map(|f| {
(!in_manifest_files.contains(&f.file_id)).then_some(f.file_id)
})
.collect::<HashSet<_>>(),
)
})
.collect();
// simply return all ref files, no manifest version filtering for now.
Ok(FileRefsManifest {
file_refs: ref_files_excluding_in_manifest,
file_refs: ref_files,
manifest_version,
})
}
@@ -166,7 +140,7 @@ impl FileReferenceManager {
/// If the reference count reaches zero, the file reference will be removed from the manager.
pub fn remove_file(&self, file_meta: &FileMeta) {
let region_id = file_meta.region_id;
let file_ref = FileRef::new(file_meta.region_id, file_meta.file_id);
let file_ref = FileRef::new(region_id, file_meta.file_id);
let mut remove_table_entry = false;
let mut remove_file_ref = false;

View File

@@ -121,6 +121,9 @@ impl GcReport {
*self_files = dedup.into_iter().collect();
}
self.need_retry_regions.extend(other.need_retry_regions);
// Remove regions that have succeeded from need_retry_regions
self.need_retry_regions
.retain(|region| !self.deleted_files.contains_key(region));
}
}