From a22667bf3ca3c0776e71cbf0759106c238422148 Mon Sep 17 00:00:00 2001 From: evenyag Date: Wed, 6 Nov 2024 22:08:43 +0800 Subject: [PATCH] feat: remove too large files --- src/mito2/src/compaction.rs | 12 ++++++++++++ src/mito2/src/compaction/twcs.rs | 15 +++++++++++++-- src/mito2/src/sst/version.rs | 9 +++++++++ 3 files changed, 34 insertions(+), 2 deletions(-) diff --git a/src/mito2/src/compaction.rs b/src/mito2/src/compaction.rs index 3cea492071..e25f90345f 100644 --- a/src/mito2/src/compaction.rs +++ b/src/mito2/src/compaction.rs @@ -27,6 +27,7 @@ use std::sync::Arc; use std::time::{Duration, Instant}; use api::v1::region::compact_request; +use common_base::readable_size::ReadableSize; use common_base::Plugins; use common_telemetry::{debug, error, info}; use common_time::range::TimestampRange; @@ -637,6 +638,17 @@ fn get_expired_ssts( .collect() } +/// Finds all SSTs larger than `size` across levels. +fn get_too_large_ssts( + levels: &[LevelMeta], + size: ReadableSize, + ssts_to_remove: &mut Vec, +) { + levels + .iter() + .for_each(|l| l.get_too_large_files(size, ssts_to_remove)); +} + #[cfg(test)] mod tests { use tokio::sync::oneshot; diff --git a/src/mito2/src/compaction/twcs.rs b/src/mito2/src/compaction/twcs.rs index 2eefbb43aa..04bbe0caa4 100644 --- a/src/mito2/src/compaction/twcs.rs +++ b/src/mito2/src/compaction/twcs.rs @@ -16,6 +16,7 @@ use std::collections::hash_map::Entry; use std::collections::{BTreeMap, HashMap}; use std::fmt::Debug; +use common_base::readable_size::ReadableSize; use common_telemetry::{info, trace}; use common_time::timestamp::TimeUnit; use common_time::timestamp_millis::BucketAligned; @@ -25,7 +26,7 @@ use crate::compaction::buckets::infer_time_bucket; use crate::compaction::compactor::CompactionRegion; use crate::compaction::picker::{Picker, PickerOutput}; use crate::compaction::run::{find_sorted_runs, reduce_runs, Item}; -use crate::compaction::{get_expired_ssts, CompactionOutput}; +use crate::compaction::{get_expired_ssts, get_too_large_ssts, CompactionOutput}; use crate::sst::file::{overlaps, FileHandle, FileId, Level}; use crate::sst::version::LevelMeta; @@ -208,12 +209,22 @@ fn enforce_file_num(files: &[T], max_file_num: usize) -> Vec { files.iter().skip(min_idx).take(to_merge).cloned().collect() } +const MAX_FILE_SIZE: ReadableSize = ReadableSize::gb(4); + impl Picker for TwcsPicker { fn pick(&self, compaction_region: &CompactionRegion) -> Option { let region_id = compaction_region.region_id; let levels = compaction_region.current_version.ssts.levels(); let ttl = compaction_region.current_version.options.ttl; - let expired_ssts = get_expired_ssts(levels, ttl, Timestamp::current_millis()); + + let mut expired_ssts = get_expired_ssts(levels, ttl, Timestamp::current_millis()); + let mut too_large_ssts = Vec::new(); + get_too_large_ssts(levels, MAX_FILE_SIZE, &mut too_large_ssts); + info!( + "Remove too large SSTs in region {}: {:?}", + region_id, too_large_ssts + ); + expired_ssts.append(&mut too_large_ssts); if !expired_ssts.is_empty() { info!("Expired SSTs in region {}: {:?}", region_id, expired_ssts); // here we mark expired SSTs as compacting to avoid them being picked. diff --git a/src/mito2/src/sst/version.rs b/src/mito2/src/sst/version.rs index c677a95413..79af940ca7 100644 --- a/src/mito2/src/sst/version.rs +++ b/src/mito2/src/sst/version.rs @@ -17,6 +17,7 @@ use std::collections::HashMap; use std::fmt; use std::sync::Arc; +use common_base::readable_size::ReadableSize; use common_time::Timestamp; use crate::sst::file::{FileHandle, FileId, FileMeta, Level, MAX_LEVEL}; @@ -171,6 +172,14 @@ impl LevelMeta { .collect() } + /// Returns too large SSTs from current level. + pub fn get_too_large_files(&self, size: ReadableSize, files: &mut Vec) { + self.files + .values() + .filter(|v| v.meta_ref().file_size > size.0) + .for_each(|f| files.push(f.clone())); + } + pub fn files(&self) -> impl Iterator { self.files.values() }