feat: introduce file group in compaction (#6261)

* fix/file-group-in-compaction:
 ### Enhance Compaction Logic with File Grouping

 - **`run.rs`**: Introduced `FileGroup` struct to manage groups of `FileHandle` objects, allowing for more efficient compaction operations. Updated `Ranged` and `Item` trait implementations to work with `FileGroup`.
 - **`test_util.rs`**: Added `new_file_handle_with_sequence` function to support file handles with sequence numbers, enhancing test utilities.
 - **`twcs.rs`**: Modified `TwcsPicker` to utilize `FileGroup` for managing files within windows, improving compaction logic. Updated `Window` struct to use `HashMap` for storing `FileGroup` objects.
 - **`version_util.rs`**: Updated version control utilities to handle sequence numbers in file metadata, aligning with new compaction logic.

Signed-off-by: Lei, HUANG <lhuang@greptime.com>

* fix/file-group-in-compaction:
 ### Add Test for File Group Assignment in TWCS

 - **Enhancements in `twcs.rs`:**
   - Added a new test `test_assign_file_groups_to_windows` to verify the correct assignment of file groups to windows.
   - Enhanced `test_assign_compacting_to_windows` with a new case to ensure files with overlapping time ranges and the same sequence are treated as one `FileGroup`.

Signed-off-by: Lei, HUANG <lhuang@greptime.com>

* fix/file-group-in-compaction:
 **Enhance Compaction Task Documentation and Initialization**

 - **`run.rs`**: Added documentation for `FileGroup` to clarify its role in representing a group of files created by the same compaction task.
 - **`twcs.rs`**: Introduced comments in the `Window` struct to explain the mapping of file sequences to file groups, indicating files created from the same compaction task. Simplified the initialization of the `files` hashmap using `HashMap::from`.

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>

---------

Signed-off-by: Lei, HUANG <lhuang@greptime.com>
Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>
This commit is contained in:
Lei, HUANG
2025-06-12 17:33:40 +08:00
committed by GitHub
parent f6db419afd
commit 5bb0466ff2
4 changed files with 211 additions and 40 deletions

View File

@@ -18,8 +18,9 @@
use common_base::readable_size::ReadableSize;
use common_base::BitVec;
use common_time::Timestamp;
use smallvec::{smallvec, SmallVec};
use crate::sst::file::FileHandle;
use crate::sst::file::{FileHandle, FileId};
/// Default max compaction output file size when not specified.
const DEFAULT_MAX_OUTPUT_SIZE: u64 = ReadableSize::gb(2).as_bytes();
@@ -125,17 +126,68 @@ pub trait Item: Ranged + Clone {
fn size(&self) -> usize;
}
impl Ranged for FileHandle {
type BoundType = Timestamp;
/// A group of files that are created by the same compaction task.
#[derive(Debug, Clone)]
pub struct FileGroup {
files: SmallVec<[FileHandle; 2]>,
size: usize,
num_rows: usize,
min_timestamp: Timestamp,
max_timestamp: Timestamp,
}
fn range(&self) -> (Self::BoundType, Self::BoundType) {
self.time_range()
impl FileGroup {
pub(crate) fn new_with_file(file: FileHandle) -> Self {
let size = file.size() as usize;
let (min_timestamp, max_timestamp) = file.time_range();
let num_rows = file.num_rows();
Self {
files: smallvec![file],
size,
num_rows,
min_timestamp,
max_timestamp,
}
}
pub(crate) fn num_rows(&self) -> usize {
self.num_rows
}
pub(crate) fn add_file(&mut self, file: FileHandle) {
self.size += file.size() as usize;
self.num_rows += file.num_rows();
let (min_timestamp, max_timestamp) = file.time_range();
self.min_timestamp = self.min_timestamp.min(min_timestamp);
self.max_timestamp = self.max_timestamp.max(max_timestamp);
self.files.push(file);
}
#[cfg(test)]
pub(crate) fn files(&self) -> &[FileHandle] {
&self.files[..]
}
pub(crate) fn file_ids(&self) -> SmallVec<[FileId; 2]> {
SmallVec::from_iter(self.files.iter().map(|f| f.file_id()))
}
pub(crate) fn into_files(self) -> impl Iterator<Item = FileHandle> {
self.files.into_iter()
}
}
impl Item for FileHandle {
impl Ranged for FileGroup {
type BoundType = Timestamp;
fn range(&self) -> (Self::BoundType, Self::BoundType) {
(self.min_timestamp, self.max_timestamp)
}
}
impl Item for FileGroup {
fn size(&self) -> usize {
self.size() as usize
self.size
}
}

View File

@@ -12,6 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::num::NonZeroU64;
use common_time::Timestamp;
use crate::sst::file::{FileHandle, FileId, FileMeta, Level};
@@ -23,6 +25,23 @@ pub fn new_file_handle(
start_ts_millis: i64,
end_ts_millis: i64,
level: Level,
) -> FileHandle {
new_file_handle_with_sequence(
file_id,
start_ts_millis,
end_ts_millis,
level,
start_ts_millis as u64,
)
}
/// Test util to create file handles.
pub fn new_file_handle_with_sequence(
file_id: FileId,
start_ts_millis: i64,
end_ts_millis: i64,
level: Level,
sequence: u64,
) -> FileHandle {
let file_purger = new_noop_file_purger();
FileHandle::new(
@@ -39,7 +58,7 @@ pub fn new_file_handle(
index_file_size: 0,
num_rows: 0,
num_row_groups: 0,
sequence: None,
sequence: NonZeroU64::new(sequence),
},
file_purger,
)

View File

@@ -15,6 +15,7 @@
use std::collections::hash_map::Entry;
use std::collections::{BTreeMap, HashMap};
use std::fmt::Debug;
use std::num::NonZeroU64;
use common_base::readable_size::ReadableSize;
use common_telemetry::info;
@@ -26,7 +27,9 @@ use store_api::storage::RegionId;
use crate::compaction::buckets::infer_time_bucket;
use crate::compaction::compactor::CompactionRegion;
use crate::compaction::picker::{Picker, PickerOutput};
use crate::compaction::run::{find_sorted_runs, merge_seq_files, reduce_runs};
use crate::compaction::run::{
find_sorted_runs, merge_seq_files, reduce_runs, FileGroup, Item, Ranged,
};
use crate::compaction::{get_expired_ssts, CompactionOutput};
use crate::sst::file::{overlaps, FileHandle, Level};
use crate::sst::version::LevelMeta;
@@ -60,7 +63,8 @@ impl TwcsPicker {
if files.files.is_empty() {
continue;
}
let sorted_runs = find_sorted_runs(&mut files.files);
let mut files_to_merge: Vec<_> = files.files().cloned().collect();
let sorted_runs = find_sorted_runs(&mut files_to_merge);
let found_runs = sorted_runs.len();
// We only remove deletion markers if we found less than 2 runs and not in append mode.
// because after compaction there will be no overlapping files.
@@ -90,7 +94,7 @@ impl TwcsPicker {
);
output.push(CompactionOutput {
output_level: LEVEL_COMPACTED, // always compact to l1
inputs,
inputs: inputs.into_iter().flat_map(|fg| fg.into_files()).collect(),
filter_deleted,
output_time_range: None, // we do not enforce output time range in twcs compactions.
});
@@ -109,21 +113,21 @@ fn log_pick_result(
file_num: usize,
max_output_file_size: Option<u64>,
filter_deleted: bool,
inputs: &[FileHandle],
inputs: &[FileGroup],
) {
let input_file_str: Vec<String> = inputs
.iter()
.map(|f| {
let range = f.time_range();
let range = f.range();
let start = range.0.to_iso8601_string();
let end = range.1.to_iso8601_string();
let num_rows = f.num_rows();
format!(
"SST{{id: {}, range: ({}, {}), size: {}, num rows: {} }}",
f.file_id(),
"FileGroup{{id: {:?}, range: ({}, {}), size: {}, num rows: {} }}",
f.file_ids(),
start,
end,
ReadableSize(f.size()),
ReadableSize(f.size() as u64),
num_rows
)
})
@@ -198,7 +202,9 @@ impl Picker for TwcsPicker {
struct Window {
start: Timestamp,
end: Timestamp,
files: Vec<FileHandle>,
// Mapping from file sequence to file groups. Files with the same sequence is considered
// created from the same compaction task.
files: HashMap<Option<NonZeroU64>, FileGroup>,
time_window: i64,
overlapping: bool,
}
@@ -207,10 +213,11 @@ impl Window {
/// Creates a new [Window] with given file.
fn new_with_file(file: FileHandle) -> Self {
let (start, end) = file.time_range();
let files = HashMap::from([(file.meta_ref().sequence, FileGroup::new_with_file(file))]);
Self {
start,
end,
files: vec![file],
files,
time_window: 0,
overlapping: false,
}
@@ -226,7 +233,19 @@ impl Window {
let (start, end) = file.time_range();
self.start = self.start.min(start);
self.end = self.end.max(end);
self.files.push(file);
match self.files.entry(file.meta_ref().sequence) {
Entry::Occupied(mut o) => {
o.get_mut().add_file(file);
}
Entry::Vacant(v) => {
v.insert(FileGroup::new_with_file(file));
}
}
}
fn files(&self) -> impl Iterator<Item = &FileGroup> {
self.files.values()
}
}
@@ -311,7 +330,7 @@ mod tests {
use std::collections::HashSet;
use super::*;
use crate::compaction::test_util::new_file_handle;
use crate::compaction::test_util::{new_file_handle, new_file_handle_with_sequence};
use crate::sst::file::{FileId, Level};
#[test]
@@ -371,7 +390,9 @@ mod tests {
.iter(),
3,
);
assert_eq!(5, windows.get(&0).unwrap().files.len());
let fgs = &windows.get(&0).unwrap().files;
assert_eq!(1, fgs.len());
assert_eq!(fgs.values().map(|f| f.files().len()).sum::<usize>(), 5);
let files = [FileId::random(); 3];
let windows = assign_to_windows(
@@ -385,15 +406,56 @@ mod tests {
);
assert_eq!(
files[0],
windows.get(&0).unwrap().files.first().unwrap().file_id()
windows.get(&0).unwrap().files().next().unwrap().files()[0].file_id()
);
assert_eq!(
files[1],
windows.get(&3).unwrap().files.first().unwrap().file_id()
windows.get(&3).unwrap().files().next().unwrap().files()[0].file_id()
);
assert_eq!(
files[2],
windows.get(&12).unwrap().files.first().unwrap().file_id()
windows.get(&12).unwrap().files().next().unwrap().files()[0].file_id()
);
}
#[test]
fn test_assign_file_groups_to_windows() {
let files = [
FileId::random(),
FileId::random(),
FileId::random(),
FileId::random(),
];
let windows = assign_to_windows(
[
new_file_handle_with_sequence(files[0], 0, 999, 0, 1),
new_file_handle_with_sequence(files[1], 0, 999, 0, 1),
new_file_handle_with_sequence(files[2], 0, 999, 0, 2),
new_file_handle_with_sequence(files[3], 0, 999, 0, 2),
]
.iter(),
3,
);
assert_eq!(windows.len(), 1);
let fgs = &windows.get(&0).unwrap().files;
assert_eq!(2, fgs.len());
assert_eq!(
fgs.get(&NonZeroU64::new(1))
.unwrap()
.files()
.iter()
.map(|f| f.file_id())
.collect::<HashSet<_>>(),
[files[0], files[1]].into_iter().collect()
);
assert_eq!(
fgs.get(&NonZeroU64::new(2))
.unwrap()
.files()
.iter()
.map(|f| f.file_id())
.collect::<HashSet<_>>(),
[files[2], files[3]].into_iter().collect()
);
}
@@ -408,8 +470,22 @@ mod tests {
];
files[0].set_compacting(true);
files[2].set_compacting(true);
let windows = assign_to_windows(files.iter(), 3);
assert_eq!(3, windows.get(&0).unwrap().files.len());
let mut windows = assign_to_windows(files.iter(), 3);
let window0 = windows.remove(&0).unwrap();
assert_eq!(1, window0.files.len());
let candidates = window0
.files
.into_values()
.flat_map(|fg| fg.into_files())
.map(|f| f.file_id())
.collect::<HashSet<_>>();
assert_eq!(candidates.len(), 3);
assert_eq!(
candidates,
[files[1].file_id(), files[3].file_id(), files[4].file_id()]
.into_iter()
.collect::<HashSet<_>>()
);
}
/// (Window value, overlapping, files' time ranges in window)
@@ -438,9 +514,11 @@ mod tests {
let mut file_ranges = actual_window
.files
.iter()
.map(|f| {
let (s, e) = f.time_range();
(s.value(), e.value())
.flat_map(|(_, f)| {
f.files().iter().map(|f| {
let (s, e) = f.time_range();
(s.value(), e.value())
})
})
.collect::<Vec<_>>();
file_ranges.sort_unstable_by(|l, r| l.0.cmp(&r.0).then(l.1.cmp(&r.1)));
@@ -607,10 +685,10 @@ mod tests {
CompactionPickerTestCase {
window_size: 3,
input_files: [
new_file_handle(file_ids[0], -2000, -3, 0),
new_file_handle(file_ids[1], -3000, -100, 0),
new_file_handle(file_ids[2], 0, 2999, 0), //active windows
new_file_handle(file_ids[3], 50, 2998, 0), //active windows
new_file_handle_with_sequence(file_ids[0], -2000, -3, 0, 1),
new_file_handle_with_sequence(file_ids[1], -3000, -100, 0, 2),
new_file_handle_with_sequence(file_ids[2], 0, 2999, 0, 3), //active windows
new_file_handle_with_sequence(file_ids[3], 50, 2998, 0, 4), //active windows
]
.to_vec(),
expected_outputs: vec![
@@ -636,11 +714,11 @@ mod tests {
CompactionPickerTestCase {
window_size: 3,
input_files: [
new_file_handle(file_ids[0], -2000, -3, 0),
new_file_handle(file_ids[1], -3000, -100, 0),
new_file_handle(file_ids[2], 0, 2999, 0),
new_file_handle(file_ids[3], 50, 2998, 0),
new_file_handle(file_ids[4], 11, 2990, 0),
new_file_handle_with_sequence(file_ids[0], -2000, -3, 0, 1),
new_file_handle_with_sequence(file_ids[1], -3000, -100, 0, 2),
new_file_handle_with_sequence(file_ids[2], 0, 2999, 0, 3),
new_file_handle_with_sequence(file_ids[3], 50, 2998, 0, 4),
new_file_handle_with_sequence(file_ids[4], 11, 2990, 0, 5),
]
.to_vec(),
expected_outputs: vec![
@@ -655,6 +733,27 @@ mod tests {
],
}
.check();
// Case 3:
// A compaction may split output into several files that have overlapping time ranges and same sequence,
// we should treat these files as one FileGroup.
let file_ids = (0..6).map(|_| FileId::random()).collect::<Vec<_>>();
CompactionPickerTestCase {
window_size: 3,
input_files: [
new_file_handle_with_sequence(file_ids[0], 0, 2999, 1, 1),
new_file_handle_with_sequence(file_ids[1], 0, 2998, 1, 1),
new_file_handle_with_sequence(file_ids[2], 3000, 5999, 1, 2),
new_file_handle_with_sequence(file_ids[3], 3000, 5000, 1, 2),
new_file_handle_with_sequence(file_ids[4], 11, 2990, 0, 3),
]
.to_vec(),
expected_outputs: vec![ExpectedOutput {
input_files: vec![0, 1, 4],
output_level: 1,
}],
}
.check();
}
// TODO(hl): TTL tester that checks if get_expired_ssts function works as expected.

View File

@@ -15,6 +15,7 @@
//! Utilities to mock version.
use std::collections::HashMap;
use std::num::NonZeroU64;
use std::sync::Arc;
use api::v1::value::ValueData;
@@ -103,7 +104,7 @@ impl VersionControlBuilder {
index_file_size: 0,
num_rows: 0,
num_row_groups: 0,
sequence: None,
sequence: NonZeroU64::new(start_ms as u64),
},
);
self
@@ -196,7 +197,7 @@ pub(crate) fn apply_edit(
index_file_size: 0,
num_rows: 0,
num_row_groups: 0,
sequence: None,
sequence: NonZeroU64::new(*start_ms as u64),
}
})
.collect();