feat: gc worker only local regions&test (#7203)

* feat: gc worker only on local region

Signed-off-by: discord9 <discord9@163.com>

* more check

Signed-off-by: discord9 <discord9@163.com>

* chore: stuff

Signed-off-by: discord9 <discord9@163.com>

* fix: ignore async index file for now

Signed-off-by: discord9 <discord9@163.com>

* fix: file removal rate calc

Signed-off-by: discord9 <discord9@163.com>

* chore: per review

Signed-off-by: discord9 <discord9@163.com>

* chore: per review

Signed-off-by: discord9 <discord9@163.com>

* clippy

Signed-off-by: discord9 <discord9@163.com>

---------

Signed-off-by: discord9 <discord9@163.com>
This commit is contained in:
discord9
2025-11-18 10:45:09 +08:00
committed by GitHub
parent ff2a12a49d
commit 29bbff3c90
32 changed files with 749 additions and 272 deletions

View File

@@ -106,6 +106,37 @@ This mechanism may be too complex to implement at once. We can consider a two-ph
Also the read replica shouldn't be later in manifest version for more than the lingering time of obsolete files, otherwise it might ref to files that are already deleted by the GC worker.
- need to upload tmp manifest to object storage, which may introduce additional complexity and potential performance overhead. But since long-running queries are typically not frequent, the performance impact is expected to be minimal.
one potential race condition with region-migration is illustrated below:
```mermaid
sequenceDiagram
participant gc_worker as GC Worker(same dn as region 1)
participant region1 as Region 1 (Leader → Follower)
participant region2 as Region 2 (Follower → Leader)
participant region_dir as Region Directory
gc_worker->>region1: Start GC, get region manifest
activate region1
region1-->>gc_worker: Region 1 manifest
deactivate region1
gc_worker->>region_dir: Scan region directory
Note over region1,region2: Region Migration Occurs
region1-->>region2: Downgrade to Follower
region2-->>region1: Becomes Leader
region2->>region_dir: Add new file
gc_worker->>region_dir: Continue scanning
gc_worker-->>region_dir: Discovers new file
Note over gc_worker: New file not in Region 1's manifest
gc_worker->>gc_worker: Mark file as orphan(incorrectly)
```
which could cause gc worker to incorrectly mark the new file as orphan and delete it, if config the lingering time for orphan files(files not mentioned anywhere(in used or unused)) is not long enough.
A good enough solution could be to use lock to prevent gc worker to happen on the region if region migration is happening on the region, and vise versa.
The race condition between gc worker and repartition also needs to be considered carefully. For now, acquiring lock for both region-migration and repartition during gc worker process could be a simple solution.
## Conclusion and Rationale

View File

@@ -132,6 +132,8 @@ pub enum RegionManifestInfo {
Mito {
manifest_version: u64,
flushed_entry_id: u64,
/// Number of files removed in the manifest's `removed_files` field.
file_removed_cnt: u64,
},
Metric {
data_manifest_version: u64,
@@ -271,9 +273,11 @@ impl From<store_api::region_engine::RegionManifestInfo> for RegionManifestInfo {
store_api::region_engine::RegionManifestInfo::Mito {
manifest_version,
flushed_entry_id,
file_removed_cnt,
} => RegionManifestInfo::Mito {
manifest_version,
flushed_entry_id,
file_removed_cnt,
},
store_api::region_engine::RegionManifestInfo::Metric {
data_manifest_version,

View File

@@ -182,7 +182,7 @@ fn alter_request_handler(_peer: Peer, request: RegionRequest) -> Result<RegionRe
let region_id = RegionId::from(req.region_id);
response.extensions.insert(
MANIFEST_INFO_EXTENSION_KEY.to_string(),
RegionManifestInfo::encode_list(&[(region_id, RegionManifestInfo::mito(1, 1))])
RegionManifestInfo::encode_list(&[(region_id, RegionManifestInfo::mito(1, 1, 0))])
.unwrap(),
);
response.extensions.insert(

View File

@@ -433,7 +433,7 @@ impl Display for GetFileRefs {
/// Instruction to trigger garbage collection for a region.
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct GcRegions {
/// The region ID to perform GC on.
/// The region ID to perform GC on, only regions that are currently on the given datanode can be garbage collected, regions not on the datanode will report errors.
pub regions: Vec<RegionId>,
/// The file references manifest containing temporary file references.
pub file_refs_manifest: FileRefsManifest,

View File

@@ -67,6 +67,7 @@ impl LeaderRegionManifestInfo {
RegionManifestInfo::Mito {
manifest_version,
flushed_entry_id,
file_removed_cnt: _,
} => LeaderRegionManifestInfo::Mito {
manifest_version,
flushed_entry_id,

View File

@@ -39,7 +39,6 @@ impl InstructionHandler for GetFileRefsHandler {
error: Some("MitoEngine not found".to_string()),
}));
};
match mito_engine
.get_snapshot_of_unmanifested_refs(get_file_refs.region_ids)
.await

View File

@@ -15,7 +15,7 @@
use common_meta::instruction::{GcRegions, GcRegionsReply, InstructionReply};
use common_telemetry::{debug, warn};
use mito2::gc::LocalGcWorker;
use snafu::{OptionExt, ResultExt};
use snafu::{OptionExt, ResultExt, ensure};
use store_api::storage::{FileRefsManifest, RegionId};
use crate::error::{GcMitoEngineSnafu, InvalidGcArgsSnafu, Result, UnexpectedSnafu};
@@ -35,20 +35,6 @@ impl InstructionHandler for GcRegionsHandler {
let region_ids = gc_regions.regions.clone();
debug!("Received gc regions instruction: {:?}", region_ids);
let is_same_table = region_ids.windows(2).all(|w| {
let t1 = w[0].table_id();
let t2 = w[1].table_id();
t1 == t2
});
if !is_same_table {
return Some(InstructionReply::GcRegions(GcRegionsReply {
result: Err(format!(
"Regions to GC should belong to the same table, found: {:?}",
region_ids
)),
}));
}
let (region_id, gc_worker) = match self
.create_gc_worker(
ctx,
@@ -103,6 +89,8 @@ impl InstructionHandler for GcRegionsHandler {
}
impl GcRegionsHandler {
/// Create a GC worker for the given region IDs.
/// Return the first region ID(after sort by given region id) and the GC worker.
async fn create_gc_worker(
&self,
ctx: &HandlerContext,
@@ -112,22 +100,51 @@ impl GcRegionsHandler {
) -> Result<(RegionId, LocalGcWorker)> {
// always use the smallest region id on datanode as the target region id
region_ids.sort_by_key(|r| r.region_number());
ensure!(
region_ids.windows(2).all(|w| {
let t1 = w[0].table_id();
let t2 = w[1].table_id();
t1 == t2
}),
InvalidGcArgsSnafu {
msg: format!(
"Regions to GC should belong to the same table, found: {:?}",
region_ids
),
}
);
let mito_engine = ctx
.region_server
.mito_engine()
.with_context(|| UnexpectedSnafu {
violated: "MitoEngine not found".to_string(),
})?;
let region_id = *region_ids.first().with_context(|| UnexpectedSnafu {
violated: "No region ids provided".to_string(),
let region_id = *region_ids.first().with_context(|| InvalidGcArgsSnafu {
msg: "No region ids provided".to_string(),
})?;
let mito_config = mito_engine.mito_config();
// also need to ensure all regions are on this datanode
ensure!(
region_ids
.iter()
.all(|rid| mito_engine.find_region(*rid).is_some()),
InvalidGcArgsSnafu {
msg: format!(
"Some regions are not on current datanode:{:?}",
region_ids
.iter()
.filter(|rid| mito_engine.find_region(**rid).is_none())
.collect::<Vec<_>>()
),
}
);
// Find the access layer from one of the regions that exists on this datanode
let access_layer = region_ids
.iter()
.find_map(|rid| mito_engine.find_region(*rid))
let access_layer = mito_engine
.find_region(region_id)
.with_context(|| InvalidGcArgsSnafu {
msg: format!(
"None of the regions is on current datanode:{:?}",
@@ -136,14 +153,25 @@ impl GcRegionsHandler {
})?
.access_layer();
let mito_regions = region_ids
.iter()
.map(|rid| {
mito_engine
.find_region(*rid)
.map(|r| (*rid, r))
.with_context(|| InvalidGcArgsSnafu {
msg: format!("Region {} not found on datanode", rid),
})
})
.collect::<Result<_>>()?;
let cache_manager = mito_engine.cache_manager();
let gc_worker = LocalGcWorker::try_new(
access_layer.clone(),
Some(cache_manager),
region_ids.into_iter().collect(),
Default::default(),
mito_config.clone().into(),
mito_regions,
mito_engine.mito_config().gc.clone(),
file_ref_manifest.clone(),
&mito_engine.gc_limiter(),
full_file_listing,

View File

@@ -518,7 +518,7 @@ impl RegionServer {
let manifest_info = match manifest_info {
ManifestInfo::MitoManifestInfo(info) => {
RegionManifestInfo::mito(info.data_manifest_version, 0)
RegionManifestInfo::mito(info.data_manifest_version, 0, 0)
}
ManifestInfo::MetricManifestInfo(info) => RegionManifestInfo::metric(
info.data_manifest_version,

View File

@@ -73,6 +73,7 @@ mod tests {
region_manifest: RegionManifestInfo::Mito {
manifest_version,
flushed_entry_id: 0,
file_removed_cnt: 0,
},
rcus: 0,
wcus: 0,

View File

@@ -102,6 +102,7 @@ mod tests {
region_manifest: RegionManifestInfo::Mito {
manifest_version: 0,
flushed_entry_id: 0,
file_removed_cnt: 0,
},
data_topic_latest_entry_id: 0,
metadata_topic_latest_entry_id: 0,

View File

@@ -294,6 +294,7 @@ mod tests {
region_manifest: RegionManifestInfo::Mito {
manifest_version: 1,
flushed_entry_id: 100,
file_removed_cnt: 0,
},
written_bytes,
data_topic_latest_entry_id: 200,

View File

@@ -173,6 +173,7 @@ mod test {
region_manifest: RegionManifestInfo::Mito {
manifest_version: 0,
flushed_entry_id: 0,
file_removed_cnt: 0,
},
data_topic_latest_entry_id: 0,
metadata_topic_latest_entry_id: 0,

View File

@@ -195,6 +195,7 @@ mod tests {
region_manifest: RegionManifestInfo::Mito {
manifest_version: 0,
flushed_entry_id: 0,
file_removed_cnt: 0,
},
data_topic_latest_entry_id: 0,
metadata_topic_latest_entry_id: 0,
@@ -224,6 +225,7 @@ mod tests {
region_manifest: RegionManifestInfo::Mito {
manifest_version: 0,
flushed_entry_id: 0,
file_removed_cnt: 0,
},
data_topic_latest_entry_id: 0,
metadata_topic_latest_entry_id: 0,
@@ -253,6 +255,7 @@ mod tests {
region_manifest: RegionManifestInfo::Mito {
manifest_version: 0,
flushed_entry_id: 0,
file_removed_cnt: 0,
},
data_topic_latest_entry_id: 0,
metadata_topic_latest_entry_id: 0,

View File

@@ -45,7 +45,7 @@ impl MetricEngineInner {
.metadata_flushed_entry_id()
.unwrap_or_default();
let metadata_region_manifest =
RegionManifestInfo::mito(metadata_manifest_version, metadata_flushed_entry_id);
RegionManifestInfo::mito(metadata_manifest_version, metadata_flushed_entry_id, 0);
let metadata_synced = self
.mito
.sync_region(metadata_region_id, metadata_region_manifest)
@@ -57,7 +57,7 @@ impl MetricEngineInner {
let data_manifest_version = manifest_info.data_manifest_version();
let data_flushed_entry_id = manifest_info.data_flushed_entry_id();
let data_region_manifest =
RegionManifestInfo::mito(data_manifest_version, data_flushed_entry_id);
RegionManifestInfo::mito(data_manifest_version, data_flushed_entry_id, 0);
let data_synced = self
.mito

View File

@@ -1111,9 +1111,8 @@ mod tests {
checkpoint_distance: 10,
remove_file_options: Default::default(),
},
Default::default(),
Default::default(),
FormatType::PrimaryKey,
&Default::default(),
)
.await
.unwrap();

View File

@@ -172,21 +172,16 @@ pub async fn open_compaction_region(
compress_type: manifest_compress_type(mito_config.compress_manifest),
checkpoint_distance: mito_config.manifest_checkpoint_distance,
remove_file_options: RemoveFileOptions {
keep_count: mito_config.experimental_manifest_keep_removed_file_count,
keep_ttl: mito_config.experimental_manifest_keep_removed_file_ttl,
enable_gc: mito_config.gc.enable,
},
};
RegionManifestManager::open(
region_manifest_options,
Default::default(),
Default::default(),
)
.await?
.context(EmptyRegionDirSnafu {
region_id: req.region_id,
region_dir: &region_dir_from_table_dir(&req.table_dir, req.region_id, req.path_type),
})?
RegionManifestManager::open(region_manifest_options, &Default::default())
.await?
.with_context(|| EmptyRegionDirSnafu {
region_id: req.region_id,
region_dir: region_dir_from_table_dir(&req.table_dir, req.region_id, req.path_type),
})?
};
let manifest = manifest_manager.manifest();

View File

@@ -31,7 +31,7 @@ mod catchup_test;
#[cfg(test)]
mod close_test;
#[cfg(test)]
mod compaction_test;
pub(crate) mod compaction_test;
#[cfg(test)]
mod create_test;
#[cfg(test)]
@@ -297,15 +297,12 @@ impl MitoEngine {
let file_ref_mgr = self.file_ref_manager();
let region_ids = region_ids.into_iter().collect::<Vec<_>>();
// Convert region IDs to MitoRegionRef objects, error if any region doesn't exist
// Convert region IDs to MitoRegionRef objects, ignore regions that do not exist on current datanode
// as regions on other datanodes are not managed by this engine.
let regions: Vec<MitoRegionRef> = region_ids
.into_iter()
.map(|region_id| {
self.find_region(region_id)
.with_context(|| RegionNotFoundSnafu { region_id })
})
.collect::<Result<_>>()?;
.filter_map(|region_id| self.find_region(region_id))
.collect();
file_ref_mgr
.get_snapshot_of_unmanifested_refs(regions)
@@ -369,7 +366,11 @@ impl MitoEngine {
}
/// Returns a scanner to scan for `request`.
async fn scanner(&self, region_id: RegionId, request: ScanRequest) -> Result<Scanner> {
pub(crate) async fn scanner(
&self,
region_id: RegionId,
request: ScanRequest,
) -> Result<Scanner> {
self.scan_region(region_id, request)?.scanner().await
}

View File

@@ -199,7 +199,7 @@ async fn test_alter_region_with_format(flat_format: bool) {
assert_eq!(manifests.len(), 1);
let (return_region_id, manifest) = manifests.remove(0);
assert_eq!(return_region_id, region_id);
assert_eq!(manifest, RegionManifestInfo::mito(2, 1));
assert_eq!(manifest, RegionManifestInfo::mito(2, 1, 0));
let column_metadatas =
parse_column_metadatas(&response.extensions, TABLE_COLUMN_METADATA_EXTENSION_KEY).unwrap();
assert_column_metadatas(

View File

@@ -151,7 +151,7 @@ async fn test_sync_after_flush_region_with_format(flat_format: bool) {
scan_check(&follower_engine, region_id, expected, 0, 0).await;
// Returns error since the max manifest is 1
let manifest_info = RegionManifestInfo::mito(2, 0);
let manifest_info = RegionManifestInfo::mito(2, 0, 0);
let err = follower_engine
.sync_region(region_id, manifest_info)
.await
@@ -159,7 +159,7 @@ async fn test_sync_after_flush_region_with_format(flat_format: bool) {
let err = err.as_any().downcast_ref::<Error>().unwrap();
assert_matches!(err, Error::InstallManifestTo { .. });
let manifest_info = RegionManifestInfo::mito(1, 0);
let manifest_info = RegionManifestInfo::mito(1, 0, 0);
follower_engine
.sync_region(region_id, manifest_info)
.await
@@ -264,7 +264,7 @@ async fn test_sync_after_alter_region_with_format(flat_format: bool) {
scan_check(&follower_engine, region_id, expected, 0, 0).await;
// Sync the region from the leader engine to the follower engine
let manifest_info = RegionManifestInfo::mito(2, 0);
let manifest_info = RegionManifestInfo::mito(2, 0, 0);
follower_engine
.sync_region(region_id, manifest_info)
.await

View File

@@ -21,7 +21,7 @@
//! `unknown files`: files that are not recorded in the manifest, usually due to saved checkpoint which remove actions before the checkpoint.
//!
use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
use std::collections::{BTreeMap, HashMap, HashSet};
use std::sync::Arc;
use std::time::Duration;
@@ -30,7 +30,7 @@ use common_telemetry::{debug, error, info, warn};
use common_time::Timestamp;
use object_store::{Entry, Lister};
use serde::{Deserialize, Serialize};
use snafu::{OptionExt, ResultExt as _, ensure};
use snafu::{OptionExt, ResultExt as _};
use store_api::storage::{FileId, FileRefsManifest, GcReport, RegionId};
use tokio::sync::{OwnedSemaphorePermit, TryAcquireError};
use tokio_stream::StreamExt;
@@ -39,15 +39,16 @@ use crate::access_layer::AccessLayerRef;
use crate::cache::CacheManagerRef;
use crate::config::MitoConfig;
use crate::error::{
DurationOutOfRangeSnafu, EmptyRegionDirSnafu, JoinSnafu, OpenDalSnafu, RegionNotFoundSnafu,
Result, TooManyGcJobsSnafu, UnexpectedSnafu,
DurationOutOfRangeSnafu, JoinSnafu, OpenDalSnafu, Result, TooManyGcJobsSnafu, UnexpectedSnafu,
};
use crate::manifest::manager::{RegionManifestManager, RegionManifestOptions, RemoveFileOptions};
use crate::manifest::storage::manifest_compress_type;
use crate::metrics::GC_DEL_FILE_CNT;
use crate::region::opener::new_manifest_dir;
use crate::manifest::action::RegionManifest;
use crate::metrics::GC_DELETE_FILE_CNT;
use crate::region::{MitoRegionRef, RegionRoleState};
use crate::sst::file::delete_files;
use crate::sst::location::{self, region_dir_from_table_dir};
use crate::sst::location::{self};
#[cfg(test)]
mod worker_test;
/// Limit the amount of concurrent GC jobs on the datanode
pub struct GcLimiter {
@@ -95,16 +96,18 @@ impl GcLimiter {
}
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
#[serde(default)]
pub struct GcConfig {
/// Whether GC is enabled.
pub enable: bool,
/// Lingering time before deleting files.
/// Should be long enough to allow long running queries to finish.
/// If set to None, then unused files will be deleted immediately.
///
/// TODO(discord9): long running queries should actively write tmp manifest files
/// to prevent deletion of files they are using.
#[serde(with = "humantime_serde")]
pub lingering_time: Duration,
pub lingering_time: Option<Duration>,
/// Lingering time before deleting unknown files(files with undetermine expel time).
/// expel time is the time when the file is considered as removed, as in removed from the manifest.
/// This should only occur rarely, as manifest keep tracks in `removed_files` field
@@ -124,10 +127,10 @@ impl Default for GcConfig {
fn default() -> Self {
Self {
enable: false,
// expect long running queries to be finished within a reasonable time
lingering_time: Duration::from_secs(60 * 5),
// 6 hours, for unknown expel time, which is when this file get removed from manifest, it should rarely happen, can keep it longer
unknown_file_lingering_time: Duration::from_secs(60 * 60 * 6),
// expect long running queries to be finished(or at least be able to notify it's using a deleted file) within a reasonable time
lingering_time: Some(Duration::from_secs(60)),
// 1 hours, for unknown expel time, which is when this file get removed from manifest, it should rarely happen, can keep it longer
unknown_file_lingering_time: Duration::from_secs(60 * 60),
max_concurrent_lister_per_gc_job: 32,
max_concurrent_gc_job: 4,
}
@@ -137,10 +140,9 @@ impl Default for GcConfig {
pub struct LocalGcWorker {
pub(crate) access_layer: AccessLayerRef,
pub(crate) cache_manager: Option<CacheManagerRef>,
pub(crate) manifest_mgrs: HashMap<RegionId, RegionManifestManager>,
pub(crate) regions: BTreeMap<RegionId, MitoRegionRef>,
/// Lingering time before deleting files.
pub(crate) opt: GcConfig,
pub(crate) manifest_open_config: ManifestOpenConfig,
/// Tmp ref files manifest, used to determine which files are still in use by ongoing queries.
///
/// Also contains manifest versions of regions when the tmp ref files are generated.
@@ -186,58 +188,37 @@ impl LocalGcWorker {
pub async fn try_new(
access_layer: AccessLayerRef,
cache_manager: Option<CacheManagerRef>,
regions_to_gc: BTreeSet<RegionId>,
regions_to_gc: BTreeMap<RegionId, MitoRegionRef>,
opt: GcConfig,
manifest_open_config: ManifestOpenConfig,
file_ref_manifest: FileRefsManifest,
limiter: &GcLimiterRef,
full_file_listing: bool,
) -> Result<Self> {
let table_id = regions_to_gc
.first()
.context(UnexpectedSnafu {
reason: "Expect at least one region, found none",
})?
.table_id();
let permit = limiter.permit()?;
let mut zelf = Self {
Ok(Self {
access_layer,
cache_manager,
manifest_mgrs: HashMap::new(),
regions: regions_to_gc,
opt,
manifest_open_config,
file_ref_manifest,
_permit: permit,
full_file_listing,
};
// dedup just in case
for region_id in regions_to_gc {
ensure!(
region_id.table_id() == table_id,
UnexpectedSnafu {
reason: format!(
"All regions should belong to the same table, found region {} and table {}",
region_id, table_id
),
}
);
let mgr = zelf.open_mgr_for(region_id).await?;
zelf.manifest_mgrs.insert(region_id, mgr);
}
Ok(zelf)
})
}
/// Get tmp ref files for all current regions
///
/// Outdated regions are added to `outdated_regions` set
/// Outdated regions are added to `outdated_regions` set, which means their manifest version in
/// self.file_ref_manifest is older than the current manifest version on datanode.
/// so they need to retry GC later by metasrv with updated tmp ref files.
pub async fn read_tmp_ref_files(
&self,
outdated_regions: &mut HashSet<RegionId>,
) -> Result<HashMap<RegionId, HashSet<FileId>>> {
for (region_id, region_mgr) in &self.manifest_mgrs {
let current_version = region_mgr.manifest().manifest_version;
// verify manifest version before reading tmp ref files
for (region_id, mito_region) in &self.regions {
let current_version = mito_region.manifest_ctx.manifest_version().await;
if &current_version
> self
.file_ref_manifest
@@ -253,7 +234,6 @@ impl LocalGcWorker {
outdated_regions.insert(*region_id);
}
}
// TODO(discord9): verify manifest version before reading tmp ref files
let mut tmp_ref_files = HashMap::new();
for (region_id, file_refs) in &self.file_ref_manifest.file_refs {
@@ -282,15 +262,22 @@ impl LocalGcWorker {
let mut outdated_regions = HashSet::new();
let mut deleted_files = HashMap::new();
let tmp_ref_files = self.read_tmp_ref_files(&mut outdated_regions).await?;
for region_id in self.manifest_mgrs.keys() {
debug!("Doing gc for region {}", region_id);
for (region_id, region) in &self.regions {
if region.manifest_ctx.current_state() == RegionRoleState::Follower {
return UnexpectedSnafu {
reason: format!(
"Region {} is in Follower state, should not run GC on follower regions",
region_id
),
}
.fail();
}
let tmp_ref_files = tmp_ref_files
.get(region_id)
.cloned()
.unwrap_or_else(HashSet::new);
let files = self.do_region_gc(*region_id, &tmp_ref_files).await?;
let files = self.do_region_gc(region.clone(), &tmp_ref_files).await?;
deleted_files.insert(*region_id, files);
debug!("Gc for region {} finished", region_id);
}
info!(
"LocalGcWorker finished after {} secs.",
@@ -319,19 +306,17 @@ impl LocalGcWorker {
/// to avoid deleting files that are still needed.
pub async fn do_region_gc(
&self,
region_id: RegionId,
region: MitoRegionRef,
tmp_ref_files: &HashSet<FileId>,
) -> Result<Vec<FileId>> {
let region_id = region.region_id();
debug!("Doing gc for region {}", region_id);
let manifest = self
.manifest_mgrs
.get(&region_id)
.context(RegionNotFoundSnafu { region_id })?
.manifest();
let manifest = region.manifest_ctx.manifest().await;
let region_id = manifest.metadata.region_id;
let current_files = &manifest.files;
let recently_removed_files = self.get_removed_files_expel_times(region_id).await?;
let recently_removed_files = self.get_removed_files_expel_times(&manifest).await?;
if recently_removed_files.is_empty() {
// no files to remove, skip
@@ -365,13 +350,11 @@ impl LocalGcWorker {
unused_len, region_id
);
// TODO(discord9): for now, ignore async index file as it's design is not stable, need to be improved once
// index file design is stable
let file_pairs: Vec<(FileId, FileId)> = unused_files
.iter()
.filter_map(|file_id| {
current_files
.get(file_id)
.map(|meta| (meta.file_id().file_id(), meta.index_file_id().file_id()))
})
.map(|file_id| (*file_id, *file_id))
.collect();
info!(
@@ -386,6 +369,9 @@ impl LocalGcWorker {
"Successfully deleted {} unused files for region {}",
unused_len, region_id
);
// TODO(discord9): update region manifest about deleted files
self.update_manifest_removed_files(&region, unused_files.clone())
.await?;
Ok(unused_files)
}
@@ -401,40 +387,31 @@ impl LocalGcWorker {
.await?;
// FIXME(discord9): if files are already deleted before calling delete_files, the metric will be inaccurate, no clean way to fix it now
GC_DEL_FILE_CNT.add(file_ids.len() as i64);
GC_DELETE_FILE_CNT.add(file_ids.len() as i64);
Ok(())
}
/// Get the manifest manager for the region.
async fn open_mgr_for(&self, region_id: RegionId) -> Result<RegionManifestManager> {
let table_dir = self.access_layer.table_dir();
let path_type = self.access_layer.path_type();
let mito_config = &self.manifest_open_config;
/// Update region manifest for clear the actually deleted files
async fn update_manifest_removed_files(
&self,
region: &MitoRegionRef,
deleted_files: Vec<FileId>,
) -> Result<()> {
debug!(
"Trying to update manifest removed files for region {}",
region.region_id()
);
let region_manifest_options = RegionManifestOptions {
manifest_dir: new_manifest_dir(&region_dir_from_table_dir(
table_dir, region_id, path_type,
)),
object_store: self.access_layer.object_store().clone(),
compress_type: manifest_compress_type(mito_config.compress_manifest),
checkpoint_distance: mito_config.manifest_checkpoint_distance,
remove_file_options: RemoveFileOptions {
keep_count: mito_config.experimental_manifest_keep_removed_file_count,
keep_ttl: mito_config.experimental_manifest_keep_removed_file_ttl,
},
};
let mut manager = region.manifest_ctx.manifest_manager.write().await;
let cnt = deleted_files.len();
manager.clear_deleted_files(deleted_files);
debug!(
"Updated region_id={} region manifest to clear {cnt} deleted files",
region.region_id(),
);
RegionManifestManager::open(
region_manifest_options,
Default::default(),
Default::default(),
)
.await?
.context(EmptyRegionDirSnafu {
region_id,
region_dir: &region_dir_from_table_dir(table_dir, region_id, path_type),
})
Ok(())
}
/// Get all the removed files in delta manifest files and their expel times.
@@ -443,14 +420,8 @@ impl LocalGcWorker {
///
pub async fn get_removed_files_expel_times(
&self,
region_id: RegionId,
region_manifest: &Arc<RegionManifest>,
) -> Result<BTreeMap<Timestamp, HashSet<FileId>>> {
let region_manifest = self
.manifest_mgrs
.get(&region_id)
.context(RegionNotFoundSnafu { region_id })?
.manifest();
let mut ret = BTreeMap::new();
for files in &region_manifest.removed_files.removed_files {
let expel_time = Timestamp::new_millisecond(files.removed_at);
@@ -627,12 +598,17 @@ impl LocalGcWorker {
) -> Result<Vec<FileId>> {
let start = tokio::time::Instant::now();
let now = chrono::Utc::now();
let may_linger_until = now
- chrono::Duration::from_std(self.opt.lingering_time).with_context(|_| {
DurationOutOfRangeSnafu {
input: self.opt.lingering_time,
}
})?;
let may_linger_until = self
.opt
.lingering_time
.map(|lingering_time| {
chrono::Duration::from_std(lingering_time)
.with_context(|_| DurationOutOfRangeSnafu {
input: lingering_time,
})
.map(|t| now - t)
})
.transpose()?;
let unknown_file_may_linger_until = now
- chrono::Duration::from_std(self.opt.unknown_file_lingering_time).with_context(
@@ -642,9 +618,15 @@ impl LocalGcWorker {
)?;
// files that may linger, which means they are not in use but may still be kept for a while
let threshold = Timestamp::new_millisecond(may_linger_until.timestamp_millis());
let threshold =
may_linger_until.map(|until| Timestamp::new_millisecond(until.timestamp_millis()));
let mut recently_removed_files = recently_removed_files;
let may_linger_files = recently_removed_files.split_off(&threshold);
let may_linger_files = match threshold {
Some(threshold) => recently_removed_files.split_off(&threshold),
None => BTreeMap::new(),
};
debug!("may_linger_files: {:?}", may_linger_files);
let may_linger_filenames = may_linger_files.values().flatten().collect::<HashSet<_>>();
let eligible_for_removal = recently_removed_files

View File

@@ -0,0 +1,401 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::{BTreeMap, HashMap, HashSet};
use std::sync::Arc;
use api::v1::Rows;
use common_telemetry::init_default_ut_logging;
use store_api::region_engine::RegionEngine as _;
use store_api::region_request::{RegionCompactRequest, RegionRequest};
use store_api::storage::{FileRefsManifest, RegionId};
use crate::config::MitoConfig;
use crate::engine::MitoEngine;
use crate::engine::compaction_test::{delete_and_flush, put_and_flush};
use crate::gc::{GcConfig, LocalGcWorker};
use crate::region::MitoRegionRef;
use crate::test_util::{
CreateRequestBuilder, TestEnv, build_rows, flush_region, put_rows, rows_schema,
};
async fn create_gc_worker(
mito_engine: &MitoEngine,
regions: BTreeMap<RegionId, MitoRegionRef>,
file_ref_manifest: &FileRefsManifest,
full_file_listing: bool,
) -> LocalGcWorker {
let access_layer = regions.first_key_value().unwrap().1.access_layer.clone();
let cache_manager = mito_engine.cache_manager();
LocalGcWorker::try_new(
access_layer,
Some(cache_manager),
regions,
mito_engine.mito_config().gc.clone(),
file_ref_manifest.clone(),
&mito_engine.gc_limiter(),
full_file_listing,
)
.await
.unwrap()
}
/// Test insert/flush then truncate can allow gc worker to delete files
#[tokio::test]
async fn test_gc_worker_basic_truncate() {
init_default_ut_logging();
let mut env = TestEnv::new().await;
env.log_store = Some(env.create_log_store().await);
// use in memory object store for gc test, so it will use `ObjectStoreFilePurger`
env.object_store_manager = Some(Arc::new(env.create_in_memory_object_store_manager()));
let engine = env
.new_mito_engine(MitoConfig {
gc: GcConfig {
enable: true,
// for faster delete file
lingering_time: None,
..Default::default()
},
..Default::default()
})
.await;
let region_id = RegionId::new(1, 1);
env.get_schema_metadata_manager()
.register_region_table_info(
region_id.table_id(),
"test_table",
"test_catalog",
"test_schema",
None,
env.get_kv_backend(),
)
.await;
let request = CreateRequestBuilder::new().build();
let column_schemas = rows_schema(&request);
engine
.handle_request(region_id, RegionRequest::Create(request.clone()))
.await
.unwrap();
let rows = Rows {
schema: column_schemas.clone(),
rows: build_rows(0, 3),
};
put_rows(&engine, region_id, rows).await;
flush_region(&engine, region_id, None).await;
let region = engine.get_region(region_id).unwrap();
let manifest = region.manifest_ctx.manifest().await;
let to_be_deleted_file_id = *manifest.files.iter().next().unwrap().0;
assert_eq!(manifest.files.len(), 1);
engine
.handle_request(
region.region_id,
RegionRequest::Truncate(store_api::region_request::RegionTruncateRequest::All),
)
.await
.unwrap();
let manifest = region.manifest_ctx.manifest().await;
assert!(
manifest.removed_files.removed_files[0]
.file_ids
.contains(&to_be_deleted_file_id)
&& manifest.removed_files.removed_files[0].file_ids.len() == 1
&& manifest.files.is_empty(),
"Manifest after truncate: {:?}",
manifest
);
let version = manifest.manifest_version;
let regions = BTreeMap::from([(region_id, region.clone())]);
let file_ref_manifest = FileRefsManifest {
file_refs: Default::default(),
manifest_version: [(region_id, version)].into(),
};
let gc_worker = create_gc_worker(&engine, regions, &file_ref_manifest, true).await;
let report = gc_worker.run().await.unwrap();
assert_eq!(
report.deleted_files.get(&region_id).unwrap(),
&vec![to_be_deleted_file_id],
);
assert!(report.need_retry_regions.is_empty());
let manifest = region.manifest_ctx.manifest().await;
assert!(manifest.removed_files.removed_files.is_empty() && manifest.files.is_empty());
}
/// Truncate with file refs should not delete files
#[tokio::test]
async fn test_gc_worker_truncate_with_ref() {
init_default_ut_logging();
let mut env = TestEnv::new().await;
env.log_store = Some(env.create_log_store().await);
// use in memory object store for gc test, so it will use `ObjectStoreFilePurger`
env.object_store_manager = Some(Arc::new(env.create_in_memory_object_store_manager()));
let engine = env
.new_mito_engine(MitoConfig {
gc: GcConfig {
enable: true,
// for faster delete file
lingering_time: None,
..Default::default()
},
..Default::default()
})
.await;
let region_id = RegionId::new(1, 1);
env.get_schema_metadata_manager()
.register_region_table_info(
region_id.table_id(),
"test_table",
"test_catalog",
"test_schema",
None,
env.get_kv_backend(),
)
.await;
let request = CreateRequestBuilder::new().build();
let column_schemas = rows_schema(&request);
engine
.handle_request(region_id, RegionRequest::Create(request.clone()))
.await
.unwrap();
let rows = Rows {
schema: column_schemas.clone(),
rows: build_rows(0, 3),
};
put_rows(&engine, region_id, rows).await;
flush_region(&engine, region_id, None).await;
let region = engine.get_region(region_id).unwrap();
let manifest = region.manifest_ctx.manifest().await;
assert_eq!(manifest.files.len(), 1);
let to_be_deleted_file_id = *manifest.files.iter().next().unwrap().0;
engine
.handle_request(
region.region_id,
RegionRequest::Truncate(store_api::region_request::RegionTruncateRequest::All),
)
.await
.unwrap();
let manifest = region.manifest_ctx.manifest().await;
assert!(
manifest.removed_files.removed_files[0]
.file_ids
.contains(&to_be_deleted_file_id)
&& manifest.removed_files.removed_files[0].file_ids.len() == 1
&& manifest.files.is_empty(),
"Manifest after truncate: {:?}",
manifest
);
let version = manifest.manifest_version;
let regions = BTreeMap::from([(region_id, region.clone())]);
let file_ref_manifest = FileRefsManifest {
file_refs: [(region_id, HashSet::from([to_be_deleted_file_id]))].into(),
manifest_version: [(region_id, version)].into(),
};
let gc_worker = create_gc_worker(&engine, regions, &file_ref_manifest, true).await;
let report = gc_worker.run().await.unwrap();
assert!(report.deleted_files.get(&region_id).unwrap().is_empty());
assert!(report.need_retry_regions.is_empty());
let manifest = region.manifest_ctx.manifest().await;
assert!(
manifest.removed_files.removed_files[0].file_ids.len() == 1 && manifest.files.is_empty(),
"Manifest: {:?}",
manifest
);
}
/// Test insert/flush then compact can allow gc worker to delete files
#[tokio::test]
async fn test_gc_worker_basic_compact() {
init_default_ut_logging();
let mut env = TestEnv::new().await;
env.log_store = Some(env.create_log_store().await);
// use in memory object store for gc test, so it will use `ObjectStoreFilePurger`
env.object_store_manager = Some(Arc::new(env.create_in_memory_object_store_manager()));
let engine = env
.new_mito_engine(MitoConfig {
gc: GcConfig {
enable: true,
// for faster delete file
lingering_time: None,
..Default::default()
},
..Default::default()
})
.await;
let region_id = RegionId::new(1, 1);
env.get_schema_metadata_manager()
.register_region_table_info(
region_id.table_id(),
"test_table",
"test_catalog",
"test_schema",
None,
env.get_kv_backend(),
)
.await;
let request = CreateRequestBuilder::new().build();
let column_schemas = rows_schema(&request);
engine
.handle_request(region_id, RegionRequest::Create(request.clone()))
.await
.unwrap();
put_and_flush(&engine, region_id, &column_schemas, 0..10).await;
put_and_flush(&engine, region_id, &column_schemas, 10..20).await;
put_and_flush(&engine, region_id, &column_schemas, 20..30).await;
delete_and_flush(&engine, region_id, &column_schemas, 15..30).await;
put_and_flush(&engine, region_id, &column_schemas, 15..25).await;
let result = engine
.handle_request(
region_id,
RegionRequest::Compact(RegionCompactRequest::default()),
)
.await
.unwrap();
assert_eq!(result.affected_rows, 0);
let region = engine.get_region(region_id).unwrap();
let manifest = region.manifest_ctx.manifest().await;
assert_eq!(manifest.removed_files.removed_files[0].file_ids.len(), 3);
let version = manifest.manifest_version;
let regions = BTreeMap::from([(region_id, region.clone())]);
let file_ref_manifest = FileRefsManifest {
file_refs: Default::default(),
manifest_version: [(region_id, version)].into(),
};
let gc_worker = create_gc_worker(&engine, regions, &file_ref_manifest, true).await;
let report = gc_worker.run().await.unwrap();
assert_eq!(report.deleted_files.get(&region_id).unwrap().len(), 3,);
assert!(report.need_retry_regions.is_empty());
}
/// Compact with file refs should not delete files
#[tokio::test]
async fn test_gc_worker_compact_with_ref() {
init_default_ut_logging();
let mut env = TestEnv::new().await;
env.log_store = Some(env.create_log_store().await);
// use in memory object store for gc test, so it will use `ObjectStoreFilePurger`
env.object_store_manager = Some(Arc::new(env.create_in_memory_object_store_manager()));
let engine = env
.new_mito_engine(MitoConfig {
gc: GcConfig {
enable: true,
// for faster delete file
lingering_time: None,
..Default::default()
},
..Default::default()
})
.await;
let region_id = RegionId::new(1, 1);
env.get_schema_metadata_manager()
.register_region_table_info(
region_id.table_id(),
"test_table",
"test_catalog",
"test_schema",
None,
env.get_kv_backend(),
)
.await;
let request = CreateRequestBuilder::new().build();
let column_schemas = rows_schema(&request);
engine
.handle_request(region_id, RegionRequest::Create(request.clone()))
.await
.unwrap();
put_and_flush(&engine, region_id, &column_schemas, 0..10).await;
put_and_flush(&engine, region_id, &column_schemas, 10..20).await;
put_and_flush(&engine, region_id, &column_schemas, 20..30).await;
delete_and_flush(&engine, region_id, &column_schemas, 15..30).await;
put_and_flush(&engine, region_id, &column_schemas, 15..25).await;
let result = engine
.handle_request(
region_id,
RegionRequest::Compact(RegionCompactRequest::default()),
)
.await
.unwrap();
assert_eq!(result.affected_rows, 0);
let region = engine.get_region(region_id).unwrap();
let manifest = region.manifest_ctx.manifest().await;
assert_eq!(manifest.removed_files.removed_files[0].file_ids.len(), 3);
let version = manifest.manifest_version;
let regions = BTreeMap::from([(region_id, region.clone())]);
let file_ref_manifest = FileRefsManifest {
file_refs: HashMap::from([(
region_id,
manifest.removed_files.removed_files[0]
.file_ids
.iter()
.cloned()
.collect(),
)]),
manifest_version: [(region_id, version)].into(),
};
let gc_worker = create_gc_worker(&engine, regions, &file_ref_manifest, true).await;
let report = gc_worker.run().await.unwrap();
assert_eq!(report.deleted_files.get(&region_id).unwrap().len(), 0);
assert!(report.need_retry_regions.is_empty());
}

View File

@@ -25,10 +25,9 @@ use store_api::metadata::RegionMetadataRef;
use store_api::storage::{FileId, RegionId, SequenceNumber};
use strum::Display;
use crate::error::{
DurationOutOfRangeSnafu, RegionMetadataNotFoundSnafu, Result, SerdeJsonSnafu, Utf8Snafu,
};
use crate::error::{RegionMetadataNotFoundSnafu, Result, SerdeJsonSnafu, Utf8Snafu};
use crate::manifest::manager::RemoveFileOptions;
use crate::region::ManifestStats;
use crate::sst::FormatType;
use crate::sst::file::FileMeta;
use crate::wal::EntryId;
@@ -236,13 +235,13 @@ impl RegionManifestBuilder {
self.flushed_entry_id = truncated_entry_id;
self.flushed_sequence = truncated_sequence;
self.truncated_entry_id = Some(truncated_entry_id);
self.files.clear();
self.removed_files.add_removed_files(
self.files.values().map(|meta| meta.file_id).collect(),
truncate
.timestamp_ms
.unwrap_or_else(|| Utc::now().timestamp_millis()),
);
self.files.clear();
}
TruncateKind::Partial { files_to_remove } => {
self.removed_files.add_removed_files(
@@ -294,6 +293,29 @@ pub struct RemovedFilesRecord {
pub removed_files: Vec<RemovedFiles>,
}
impl RemovedFilesRecord {
/// Clear the actually deleted files from the list of removed files
pub fn clear_deleted_files(&mut self, deleted_files: Vec<FileId>) {
let deleted_file_set: HashSet<_> = HashSet::from_iter(deleted_files);
for files in self.removed_files.iter_mut() {
files.file_ids.retain(|fid| !deleted_file_set.contains(fid));
}
self.removed_files.retain(|fs| !fs.file_ids.is_empty());
}
pub fn update_file_removed_cnt_to_stats(&self, stats: &ManifestStats) {
let cnt = self
.removed_files
.iter()
.map(|r| r.file_ids.len() as u64)
.sum();
stats
.file_removed_cnt
.store(cnt, std::sync::atomic::Ordering::Relaxed);
}
}
#[derive(Serialize, Deserialize, Clone, Debug, Default, PartialEq, Eq)]
pub struct RemovedFiles {
/// The timestamp is the time when
@@ -306,6 +328,9 @@ pub struct RemovedFiles {
impl RemovedFilesRecord {
/// Add a record of removed files with the current timestamp.
pub fn add_removed_files(&mut self, file_ids: HashSet<FileId>, at: i64) {
if file_ids.is_empty() {
return;
}
self.removed_files.push(RemovedFiles {
removed_at: at,
file_ids,
@@ -313,35 +338,13 @@ impl RemovedFilesRecord {
}
pub fn evict_old_removed_files(&mut self, opt: &RemoveFileOptions) -> Result<()> {
let total_removed_files: usize = self.removed_files.iter().map(|s| s.file_ids.len()).sum();
if opt.keep_count > 0 && total_removed_files <= opt.keep_count {
if !opt.enable_gc {
// If GC is not enabled, always keep removed files empty.
self.removed_files.clear();
return Ok(());
}
let mut cur_file_cnt = total_removed_files;
let can_evict_until = chrono::Utc::now()
- chrono::Duration::from_std(opt.keep_ttl).context(DurationOutOfRangeSnafu {
input: opt.keep_ttl,
})?;
self.removed_files.sort_unstable_by_key(|f| f.removed_at);
let updated = std::mem::take(&mut self.removed_files)
.into_iter()
.filter_map(|f| {
if f.removed_at < can_evict_until.timestamp_millis()
&& (opt.keep_count == 0 || cur_file_cnt >= opt.keep_count)
{
// can evict all files
// TODO(discord9): maybe only evict to below keep_count? Maybe not, or the update might be too frequent.
cur_file_cnt -= f.file_ids.len();
None
} else {
Some(f)
}
})
.collect();
self.removed_files = updated;
// if GC is enabled, rely on gc worker to delete files, and evict removed files based on options.
Ok(())
}

View File

@@ -129,6 +129,8 @@ impl Checkpointer {
manifest.removed_files.evict_old_removed_files(opt)?;
// TODO(discord9): consider also check object store to clear removed files that are already deleted? How costly it is?
Ok(manifest)
}

View File

@@ -21,6 +21,7 @@ use futures::TryStreamExt;
use object_store::ObjectStore;
use snafu::{OptionExt, ResultExt, ensure};
use store_api::metadata::RegionMetadataRef;
use store_api::storage::FileId;
use store_api::{MAX_VERSION, MIN_VERSION, ManifestVersion};
use crate::error::{
@@ -35,7 +36,7 @@ use crate::manifest::storage::{
ManifestObjectStore, file_version, is_checkpoint_file, is_delta_file,
};
use crate::metrics::MANIFEST_OP_ELAPSED;
use crate::region::{RegionLeaderState, RegionRoleState};
use crate::region::{ManifestStats, RegionLeaderState, RegionRoleState};
use crate::sst::FormatType;
/// Options for [RegionManifestManager].
@@ -53,23 +54,10 @@ pub struct RegionManifestOptions {
/// Options for updating `removed_files` field in [RegionManifest].
#[derive(Debug, Clone)]
#[cfg_attr(any(test, feature = "test"), derive(Default))]
pub struct RemoveFileOptions {
/// Number of removed files to keep in manifest's `removed_files` field before also
/// remove them from `removed_files`. Only remove files when both `keep_count` and `keep_duration` is reached.
pub keep_count: usize,
/// Duration to keep removed files in manifest's `removed_files` field before also
/// remove them from `removed_files`. Only remove files when both `keep_count` and `keep_duration` is reached.
pub keep_ttl: std::time::Duration,
}
#[cfg(any(test, feature = "test"))]
impl Default for RemoveFileOptions {
fn default() -> Self {
Self {
keep_count: 256,
keep_ttl: std::time::Duration::from_secs(3600),
}
}
/// Whether GC is enabled. If not, the removed files should always be empty when persisting manifest.
pub enable_gc: bool,
}
// rewrite note:
@@ -144,6 +132,7 @@ pub struct RegionManifestManager {
last_version: Arc<AtomicU64>,
checkpointer: Checkpointer,
manifest: Arc<RegionManifest>,
stats: ManifestStats,
stopped: bool,
}
@@ -153,17 +142,17 @@ impl RegionManifestManager {
metadata: RegionMetadataRef,
flushed_entry_id: u64,
options: RegionManifestOptions,
total_manifest_size: Arc<AtomicU64>,
manifest_version: Arc<AtomicU64>,
sst_format: FormatType,
stats: &ManifestStats,
) -> Result<Self> {
// construct storage
let mut store = ManifestObjectStore::new(
&options.manifest_dir,
options.object_store.clone(),
options.compress_type,
total_manifest_size,
stats.total_manifest_size.clone(),
);
let manifest_version = stats.manifest_version.clone();
info!(
"Creating region manifest in {} with metadata {:?}, flushed_entry_id: {}",
@@ -213,11 +202,15 @@ impl RegionManifestManager {
let checkpointer = Checkpointer::new(region_id, options, store.clone(), MIN_VERSION);
manifest_version.store(version, Ordering::Relaxed);
manifest
.removed_files
.update_file_removed_cnt_to_stats(stats);
Ok(Self {
store,
last_version: manifest_version,
checkpointer,
manifest: Arc::new(manifest),
stats: stats.clone(),
stopped: false,
})
}
@@ -227,8 +220,7 @@ impl RegionManifestManager {
/// Returns `Ok(None)` if no such manifest.
pub async fn open(
options: RegionManifestOptions,
total_manifest_size: Arc<AtomicU64>,
manifest_version: Arc<AtomicU64>,
stats: &ManifestStats,
) -> Result<Option<Self>> {
let _t = MANIFEST_OP_ELAPSED
.with_label_values(&["open"])
@@ -239,8 +231,9 @@ impl RegionManifestManager {
&options.manifest_dir,
options.object_store.clone(),
options.compress_type,
total_manifest_size,
stats.total_manifest_size.clone(),
);
let manifest_version = stats.manifest_version.clone();
// recover from storage
// construct manifest builder
@@ -314,11 +307,15 @@ impl RegionManifestManager {
last_checkpoint_version,
);
manifest_version.store(version, Ordering::Relaxed);
manifest
.removed_files
.update_file_removed_cnt_to_stats(stats);
Ok(Some(Self {
store,
last_version: manifest_version,
checkpointer,
manifest: Arc::new(manifest),
stats: stats.clone(),
stopped: false,
}))
}
@@ -442,6 +439,9 @@ impl RegionManifestManager {
);
let version = self.last_version();
new_manifest
.removed_files
.update_file_removed_cnt_to_stats(&self.stats);
self.manifest = Arc::new(new_manifest);
let last_version = self.set_version(self.manifest.manifest_version);
info!(
@@ -469,6 +469,9 @@ impl RegionManifestManager {
let builder = RegionManifestBuilder::with_checkpoint(checkpoint.checkpoint);
let manifest = builder.try_build()?;
let last_version = self.set_version(manifest.manifest_version);
manifest
.removed_files
.update_file_removed_cnt_to_stats(&self.stats);
self.manifest = Arc::new(manifest);
info!(
"Installed region manifest from checkpoint: {}, region: {}",
@@ -523,6 +526,9 @@ impl RegionManifestManager {
}
}
let new_manifest = manifest_builder.try_build()?;
new_manifest
.removed_files
.update_file_removed_cnt_to_stats(&self.stats);
let updated_manifest = self
.checkpointer
.update_manifest_removed_files(new_manifest)?;
@@ -534,6 +540,17 @@ impl RegionManifestManager {
Ok(version)
}
/// Clear deleted files from manifest's `removed_files` field without update version. Notice if datanode exit before checkpoint then new manifest by open region may still contain these deleted files, which is acceptable for gc process.
pub fn clear_deleted_files(&mut self, deleted_files: Vec<FileId>) {
let mut manifest = (*self.manifest()).clone();
manifest.removed_files.clear_deleted_files(deleted_files);
self.set_manifest(Arc::new(manifest));
}
pub(crate) fn set_manifest(&mut self, manifest: Arc<RegionManifest>) {
self.manifest = manifest;
}
/// Retrieves the current [RegionManifest].
pub fn manifest(&self) -> Arc<RegionManifest> {
self.manifest.clone()
@@ -923,6 +940,6 @@ mod test {
// get manifest size again
let manifest_size = manager.manifest_usage();
assert_eq!(manifest_size, 1764);
assert_eq!(manifest_size, 1378);
}
}

View File

@@ -474,7 +474,7 @@ lazy_static! {
.unwrap();
/// Counter for the number of files deleted by the GC worker.
pub static ref GC_DEL_FILE_CNT: IntGauge =
pub static ref GC_DELETE_FILE_CNT: IntGauge =
register_int_gauge!(
"greptime_mito_gc_delete_file_count",
"mito gc deleted file count",

View File

@@ -507,6 +507,7 @@ impl MitoRegion {
let num_rows = version.ssts.num_rows() + version.memtables.num_rows();
let num_files = version.ssts.num_files();
let manifest_version = self.stats.manifest_version();
let file_removed_cnt = self.stats.file_removed_cnt();
let topic_latest_entry_id = self.topic_latest_entry_id.load(Ordering::Relaxed);
let written_bytes = self.written_bytes.load(Ordering::Relaxed);
@@ -522,6 +523,7 @@ impl MitoRegion {
manifest: RegionManifestInfo::Mito {
manifest_version,
flushed_entry_id,
file_removed_cnt,
},
data_topic_latest_entry_id: topic_latest_entry_id,
metadata_topic_latest_entry_id: topic_latest_entry_id,
@@ -1171,9 +1173,10 @@ pub(crate) type CatchupRegionsRef = Arc<CatchupRegions>;
/// Manifest stats.
#[derive(Default, Debug, Clone)]
pub(crate) struct ManifestStats {
total_manifest_size: Arc<AtomicU64>,
manifest_version: Arc<AtomicU64>,
pub struct ManifestStats {
pub(crate) total_manifest_size: Arc<AtomicU64>,
pub(crate) manifest_version: Arc<AtomicU64>,
pub(crate) file_removed_cnt: Arc<AtomicU64>,
}
impl ManifestStats {
@@ -1184,6 +1187,10 @@ impl ManifestStats {
fn manifest_version(&self) -> u64 {
self.manifest_version.load(Ordering::Relaxed)
}
fn file_removed_cnt(&self) -> u64 {
self.file_removed_cnt.load(Ordering::Relaxed)
}
}
#[cfg(test)]
@@ -1289,9 +1296,8 @@ mod tests {
checkpoint_distance: 10,
remove_file_options: Default::default(),
},
Default::default(),
Default::default(),
FormatType::PrimaryKey,
&Default::default(),
)
.await
.unwrap();
@@ -1356,9 +1362,8 @@ mod tests {
checkpoint_distance: 10,
remove_file_options: Default::default(),
},
Default::default(),
Default::default(),
FormatType::PrimaryKey,
&Default::default(),
)
.await
.unwrap();

View File

@@ -65,7 +65,7 @@ use crate::request::OptionOutputTx;
use crate::schedule::scheduler::SchedulerRef;
use crate::sst::FormatType;
use crate::sst::file::RegionFileId;
use crate::sst::file_purger::{FilePurgerRef, create_local_file_purger};
use crate::sst::file_purger::{FilePurgerRef, create_file_purger};
use crate::sst::file_ref::FileReferenceManagerRef;
use crate::sst::index::intermediate::IntermediateManager;
use crate::sst::index::puffin_manager::PuffinManagerFactory;
@@ -279,9 +279,8 @@ impl RegionOpener {
metadata.clone(),
flushed_entry_id,
region_manifest_options,
self.stats.total_manifest_size.clone(),
self.stats.manifest_version.clone(),
sst_format,
&self.stats,
)
.await?;
@@ -322,7 +321,8 @@ impl RegionOpener {
manifest_manager,
RegionRoleState::Leader(RegionLeaderState::Writable),
)),
file_purger: create_local_file_purger(
file_purger: create_file_purger(
config.gc.enable,
self.purge_scheduler,
access_layer,
self.cache_manager,
@@ -351,7 +351,7 @@ impl RegionOpener {
let region = self
.maybe_open(config, wal)
.await?
.context(EmptyRegionDirSnafu {
.with_context(|| EmptyRegionDirSnafu {
region_id,
region_dir: &region_dir,
})?;
@@ -413,12 +413,8 @@ impl RegionOpener {
&self.region_dir(),
&self.object_store_manager,
)?;
let Some(manifest_manager) = RegionManifestManager::open(
region_manifest_options,
self.stats.total_manifest_size.clone(),
self.stats.manifest_version.clone(),
)
.await?
let Some(manifest_manager) =
RegionManifestManager::open(region_manifest_options, &self.stats).await?
else {
return Ok(None);
};
@@ -459,7 +455,8 @@ impl RegionOpener {
self.puffin_manager_factory.clone(),
self.intermediate_manager.clone(),
));
let file_purger = create_local_file_purger(
let file_purger = create_file_purger(
config.gc.enable,
self.purge_scheduler.clone(),
access_layer.clone(),
self.cache_manager.clone(),
@@ -596,8 +593,7 @@ impl RegionOpener {
compress_type: manifest_compress_type(config.compress_manifest),
checkpoint_distance: config.manifest_checkpoint_distance,
remove_file_options: RemoveFileOptions {
keep_count: config.experimental_manifest_keep_removed_file_count,
keep_ttl: config.experimental_manifest_keep_removed_file_ttl,
enable_gc: config.gc.enable,
},
})
}
@@ -688,12 +684,8 @@ impl RegionMetadataLoader {
region_dir,
&self.object_store_manager,
)?;
let Some(manifest_manager) = RegionManifestManager::open(
region_manifest_options,
Arc::new(AtomicU64::new(0)),
Arc::new(AtomicU64::new(0)),
)
.await?
let Some(manifest_manager) =
RegionManifestManager::open(region_manifest_options, &Default::default()).await?
else {
return Ok(None);
};

View File

@@ -80,15 +80,16 @@ pub fn is_local_fs(sst_layer: &AccessLayerRef) -> bool {
/// only manages the file references without deleting the actual files.
///
pub fn create_file_purger(
gc_enabled: bool,
scheduler: SchedulerRef,
sst_layer: AccessLayerRef,
cache_manager: Option<CacheManagerRef>,
file_ref_manager: FileReferenceManagerRef,
) -> FilePurgerRef {
if is_local_fs(&sst_layer) {
Arc::new(LocalFilePurger::new(scheduler, sst_layer, cache_manager))
} else {
if gc_enabled && !is_local_fs(&sst_layer) {
Arc::new(ObjectStoreFilePurger { file_ref_manager })
} else {
Arc::new(LocalFilePurger::new(scheduler, sst_layer, cache_manager))
}
}

View File

@@ -221,9 +221,9 @@ pub struct TestEnv {
data_home: TempDir,
intermediate_manager: IntermediateManager,
puffin_manager: PuffinManagerFactory,
log_store: Option<LogStoreImpl>,
pub(crate) log_store: Option<LogStoreImpl>,
log_store_factory: LogStoreFactory,
object_store_manager: Option<ObjectStoreManagerRef>,
pub(crate) object_store_manager: Option<ObjectStoreManagerRef>,
schema_metadata_manager: SchemaMetadataManagerRef,
file_ref_manager: FileReferenceManagerRef,
kv_backend: KvBackendRef,
@@ -287,7 +287,7 @@ impl TestEnv {
self.object_store_manager.clone()
}
async fn new_mito_engine(&self, config: MitoConfig) -> MitoEngine {
pub(crate) async fn new_mito_engine(&self, config: MitoConfig) -> MitoEngine {
async fn create<S: LogStore>(
zelf: &TestEnv,
config: MitoConfig,
@@ -541,30 +541,31 @@ impl TestEnv {
/// Returns the log store and object store manager.
async fn create_log_and_object_store_manager(&self) -> (LogStoreImpl, ObjectStoreManager) {
let log_store = self.create_log_store().await;
let object_store_manager = self.create_object_store_manager();
(log_store, object_store_manager)
}
pub(crate) async fn create_log_store(&self) -> LogStoreImpl {
let data_home = self.data_home.path();
let wal_path = data_home.join("wal");
let object_store_manager = self.create_object_store_manager();
match &self.log_store_factory {
LogStoreFactory::RaftEngine(factory) => {
let log_store = factory.create_log_store(wal_path).await;
(
LogStoreImpl::RaftEngine(Arc::new(log_store)),
object_store_manager,
)
LogStoreImpl::RaftEngine(Arc::new(log_store))
}
LogStoreFactory::Kafka(factory) => {
let log_store = factory.create_log_store().await;
(
LogStoreImpl::Kafka(Arc::new(log_store)),
object_store_manager,
)
LogStoreImpl::Kafka(Arc::new(log_store))
}
}
}
fn create_object_store_manager(&self) -> ObjectStoreManager {
pub(crate) fn create_object_store_manager(&self) -> ObjectStoreManager {
let data_home = self.data_home.path();
let data_path = data_home.join("data").as_path().display().to_string();
let builder = Fs::default().root(&data_path);
@@ -572,6 +573,12 @@ impl TestEnv {
ObjectStoreManager::new("default", object_store)
}
pub(crate) fn create_in_memory_object_store_manager(&self) -> ObjectStoreManager {
let builder = object_store::services::Memory::default();
let object_store = ObjectStore::new(builder).unwrap().finish();
ObjectStoreManager::new("memory", object_store)
}
/// If `initial_metadata` is `Some`, creates a new manifest. If `initial_metadata`
/// is `None`, opens an existing manifest and returns `None` if no such manifest.
pub async fn create_manifest_manager(
@@ -608,14 +615,13 @@ impl TestEnv {
metadata,
0,
manifest_opts,
Default::default(),
Default::default(),
FormatType::PrimaryKey,
&Default::default(),
)
.await
.map(Some)
} else {
RegionManifestManager::open(manifest_opts, Default::default(), Default::default()).await
RegionManifestManager::open(manifest_opts, &Default::default()).await
}
}

View File

@@ -133,9 +133,8 @@ impl SchedulerEnv {
checkpoint_distance: 10,
remove_file_options: Default::default(),
},
Default::default(),
Default::default(),
FormatType::PrimaryKey,
&Default::default(),
)
.await
.unwrap(),

View File

@@ -204,7 +204,7 @@ impl From<PbGrantedRegion> for GrantedRegion {
/// The role of the region.
/// TODO(weny): rename it to `RegionRoleState`
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub enum RegionRole {
// Readonly region(mito2)
Follower,
@@ -497,6 +497,8 @@ pub enum RegionManifestInfo {
Mito {
manifest_version: u64,
flushed_entry_id: u64,
/// Number of files removed in the manifest's `removed_files` field.
file_removed_cnt: u64,
},
Metric {
data_manifest_version: u64,
@@ -508,10 +510,11 @@ pub enum RegionManifestInfo {
impl RegionManifestInfo {
/// Creates a new [RegionManifestInfo] for mito2 engine.
pub fn mito(manifest_version: u64, flushed_entry_id: u64) -> Self {
pub fn mito(manifest_version: u64, flushed_entry_id: u64, file_removal_rate: u64) -> Self {
Self::Mito {
manifest_version,
flushed_entry_id,
file_removed_cnt: file_removal_rate,
}
}
@@ -604,6 +607,7 @@ impl Default for RegionManifestInfo {
Self::Mito {
manifest_version: 0,
flushed_entry_id: 0,
file_removed_cnt: 0,
}
}
}

View File

@@ -1539,8 +1539,8 @@ type = "time_series"
[region_engine.mito.gc]
enable = false
lingering_time = "5m"
unknown_file_lingering_time = "6h"
lingering_time = "1m"
unknown_file_lingering_time = "1h"
max_concurrent_lister_per_gc_job = 32
max_concurrent_gc_job = 4