diff --git a/src/mito2/src/compaction/run.rs b/src/mito2/src/compaction/run.rs index ba425741cf..900c4c7df6 100644 --- a/src/mito2/src/compaction/run.rs +++ b/src/mito2/src/compaction/run.rs @@ -18,8 +18,9 @@ use common_base::readable_size::ReadableSize; use common_base::BitVec; use common_time::Timestamp; +use smallvec::{smallvec, SmallVec}; -use crate::sst::file::FileHandle; +use crate::sst::file::{FileHandle, FileId}; /// Default max compaction output file size when not specified. const DEFAULT_MAX_OUTPUT_SIZE: u64 = ReadableSize::gb(2).as_bytes(); @@ -125,17 +126,68 @@ pub trait Item: Ranged + Clone { fn size(&self) -> usize; } -impl Ranged for FileHandle { - type BoundType = Timestamp; +/// A group of files that are created by the same compaction task. +#[derive(Debug, Clone)] +pub struct FileGroup { + files: SmallVec<[FileHandle; 2]>, + size: usize, + num_rows: usize, + min_timestamp: Timestamp, + max_timestamp: Timestamp, +} - fn range(&self) -> (Self::BoundType, Self::BoundType) { - self.time_range() +impl FileGroup { + pub(crate) fn new_with_file(file: FileHandle) -> Self { + let size = file.size() as usize; + let (min_timestamp, max_timestamp) = file.time_range(); + let num_rows = file.num_rows(); + Self { + files: smallvec![file], + size, + num_rows, + min_timestamp, + max_timestamp, + } + } + + pub(crate) fn num_rows(&self) -> usize { + self.num_rows + } + + pub(crate) fn add_file(&mut self, file: FileHandle) { + self.size += file.size() as usize; + self.num_rows += file.num_rows(); + let (min_timestamp, max_timestamp) = file.time_range(); + self.min_timestamp = self.min_timestamp.min(min_timestamp); + self.max_timestamp = self.max_timestamp.max(max_timestamp); + self.files.push(file); + } + + #[cfg(test)] + pub(crate) fn files(&self) -> &[FileHandle] { + &self.files[..] + } + + pub(crate) fn file_ids(&self) -> SmallVec<[FileId; 2]> { + SmallVec::from_iter(self.files.iter().map(|f| f.file_id())) + } + + pub(crate) fn into_files(self) -> impl Iterator { + self.files.into_iter() } } -impl Item for FileHandle { +impl Ranged for FileGroup { + type BoundType = Timestamp; + + fn range(&self) -> (Self::BoundType, Self::BoundType) { + (self.min_timestamp, self.max_timestamp) + } +} + +impl Item for FileGroup { fn size(&self) -> usize { - self.size() as usize + self.size } } diff --git a/src/mito2/src/compaction/test_util.rs b/src/mito2/src/compaction/test_util.rs index 26d4d1c3a8..f1b067e12f 100644 --- a/src/mito2/src/compaction/test_util.rs +++ b/src/mito2/src/compaction/test_util.rs @@ -12,6 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::num::NonZeroU64; + use common_time::Timestamp; use crate::sst::file::{FileHandle, FileId, FileMeta, Level}; @@ -23,6 +25,23 @@ pub fn new_file_handle( start_ts_millis: i64, end_ts_millis: i64, level: Level, +) -> FileHandle { + new_file_handle_with_sequence( + file_id, + start_ts_millis, + end_ts_millis, + level, + start_ts_millis as u64, + ) +} + +/// Test util to create file handles. +pub fn new_file_handle_with_sequence( + file_id: FileId, + start_ts_millis: i64, + end_ts_millis: i64, + level: Level, + sequence: u64, ) -> FileHandle { let file_purger = new_noop_file_purger(); FileHandle::new( @@ -39,7 +58,7 @@ pub fn new_file_handle( index_file_size: 0, num_rows: 0, num_row_groups: 0, - sequence: None, + sequence: NonZeroU64::new(sequence), }, file_purger, ) diff --git a/src/mito2/src/compaction/twcs.rs b/src/mito2/src/compaction/twcs.rs index fc8f8e2bad..6d669d171d 100644 --- a/src/mito2/src/compaction/twcs.rs +++ b/src/mito2/src/compaction/twcs.rs @@ -15,6 +15,7 @@ use std::collections::hash_map::Entry; use std::collections::{BTreeMap, HashMap}; use std::fmt::Debug; +use std::num::NonZeroU64; use common_base::readable_size::ReadableSize; use common_telemetry::info; @@ -26,7 +27,9 @@ use store_api::storage::RegionId; use crate::compaction::buckets::infer_time_bucket; use crate::compaction::compactor::CompactionRegion; use crate::compaction::picker::{Picker, PickerOutput}; -use crate::compaction::run::{find_sorted_runs, merge_seq_files, reduce_runs}; +use crate::compaction::run::{ + find_sorted_runs, merge_seq_files, reduce_runs, FileGroup, Item, Ranged, +}; use crate::compaction::{get_expired_ssts, CompactionOutput}; use crate::sst::file::{overlaps, FileHandle, Level}; use crate::sst::version::LevelMeta; @@ -60,7 +63,8 @@ impl TwcsPicker { if files.files.is_empty() { continue; } - let sorted_runs = find_sorted_runs(&mut files.files); + let mut files_to_merge: Vec<_> = files.files().cloned().collect(); + let sorted_runs = find_sorted_runs(&mut files_to_merge); let found_runs = sorted_runs.len(); // We only remove deletion markers if we found less than 2 runs and not in append mode. // because after compaction there will be no overlapping files. @@ -90,7 +94,7 @@ impl TwcsPicker { ); output.push(CompactionOutput { output_level: LEVEL_COMPACTED, // always compact to l1 - inputs, + inputs: inputs.into_iter().flat_map(|fg| fg.into_files()).collect(), filter_deleted, output_time_range: None, // we do not enforce output time range in twcs compactions. }); @@ -109,21 +113,21 @@ fn log_pick_result( file_num: usize, max_output_file_size: Option, filter_deleted: bool, - inputs: &[FileHandle], + inputs: &[FileGroup], ) { let input_file_str: Vec = inputs .iter() .map(|f| { - let range = f.time_range(); + let range = f.range(); let start = range.0.to_iso8601_string(); let end = range.1.to_iso8601_string(); let num_rows = f.num_rows(); format!( - "SST{{id: {}, range: ({}, {}), size: {}, num rows: {} }}", - f.file_id(), + "FileGroup{{id: {:?}, range: ({}, {}), size: {}, num rows: {} }}", + f.file_ids(), start, end, - ReadableSize(f.size()), + ReadableSize(f.size() as u64), num_rows ) }) @@ -198,7 +202,9 @@ impl Picker for TwcsPicker { struct Window { start: Timestamp, end: Timestamp, - files: Vec, + // Mapping from file sequence to file groups. Files with the same sequence is considered + // created from the same compaction task. + files: HashMap, FileGroup>, time_window: i64, overlapping: bool, } @@ -207,10 +213,11 @@ impl Window { /// Creates a new [Window] with given file. fn new_with_file(file: FileHandle) -> Self { let (start, end) = file.time_range(); + let files = HashMap::from([(file.meta_ref().sequence, FileGroup::new_with_file(file))]); Self { start, end, - files: vec![file], + files, time_window: 0, overlapping: false, } @@ -226,7 +233,19 @@ impl Window { let (start, end) = file.time_range(); self.start = self.start.min(start); self.end = self.end.max(end); - self.files.push(file); + + match self.files.entry(file.meta_ref().sequence) { + Entry::Occupied(mut o) => { + o.get_mut().add_file(file); + } + Entry::Vacant(v) => { + v.insert(FileGroup::new_with_file(file)); + } + } + } + + fn files(&self) -> impl Iterator { + self.files.values() } } @@ -311,7 +330,7 @@ mod tests { use std::collections::HashSet; use super::*; - use crate::compaction::test_util::new_file_handle; + use crate::compaction::test_util::{new_file_handle, new_file_handle_with_sequence}; use crate::sst::file::{FileId, Level}; #[test] @@ -371,7 +390,9 @@ mod tests { .iter(), 3, ); - assert_eq!(5, windows.get(&0).unwrap().files.len()); + let fgs = &windows.get(&0).unwrap().files; + assert_eq!(1, fgs.len()); + assert_eq!(fgs.values().map(|f| f.files().len()).sum::(), 5); let files = [FileId::random(); 3]; let windows = assign_to_windows( @@ -385,15 +406,56 @@ mod tests { ); assert_eq!( files[0], - windows.get(&0).unwrap().files.first().unwrap().file_id() + windows.get(&0).unwrap().files().next().unwrap().files()[0].file_id() ); assert_eq!( files[1], - windows.get(&3).unwrap().files.first().unwrap().file_id() + windows.get(&3).unwrap().files().next().unwrap().files()[0].file_id() ); assert_eq!( files[2], - windows.get(&12).unwrap().files.first().unwrap().file_id() + windows.get(&12).unwrap().files().next().unwrap().files()[0].file_id() + ); + } + + #[test] + fn test_assign_file_groups_to_windows() { + let files = [ + FileId::random(), + FileId::random(), + FileId::random(), + FileId::random(), + ]; + let windows = assign_to_windows( + [ + new_file_handle_with_sequence(files[0], 0, 999, 0, 1), + new_file_handle_with_sequence(files[1], 0, 999, 0, 1), + new_file_handle_with_sequence(files[2], 0, 999, 0, 2), + new_file_handle_with_sequence(files[3], 0, 999, 0, 2), + ] + .iter(), + 3, + ); + assert_eq!(windows.len(), 1); + let fgs = &windows.get(&0).unwrap().files; + assert_eq!(2, fgs.len()); + assert_eq!( + fgs.get(&NonZeroU64::new(1)) + .unwrap() + .files() + .iter() + .map(|f| f.file_id()) + .collect::>(), + [files[0], files[1]].into_iter().collect() + ); + assert_eq!( + fgs.get(&NonZeroU64::new(2)) + .unwrap() + .files() + .iter() + .map(|f| f.file_id()) + .collect::>(), + [files[2], files[3]].into_iter().collect() ); } @@ -408,8 +470,22 @@ mod tests { ]; files[0].set_compacting(true); files[2].set_compacting(true); - let windows = assign_to_windows(files.iter(), 3); - assert_eq!(3, windows.get(&0).unwrap().files.len()); + let mut windows = assign_to_windows(files.iter(), 3); + let window0 = windows.remove(&0).unwrap(); + assert_eq!(1, window0.files.len()); + let candidates = window0 + .files + .into_values() + .flat_map(|fg| fg.into_files()) + .map(|f| f.file_id()) + .collect::>(); + assert_eq!(candidates.len(), 3); + assert_eq!( + candidates, + [files[1].file_id(), files[3].file_id(), files[4].file_id()] + .into_iter() + .collect::>() + ); } /// (Window value, overlapping, files' time ranges in window) @@ -438,9 +514,11 @@ mod tests { let mut file_ranges = actual_window .files .iter() - .map(|f| { - let (s, e) = f.time_range(); - (s.value(), e.value()) + .flat_map(|(_, f)| { + f.files().iter().map(|f| { + let (s, e) = f.time_range(); + (s.value(), e.value()) + }) }) .collect::>(); file_ranges.sort_unstable_by(|l, r| l.0.cmp(&r.0).then(l.1.cmp(&r.1))); @@ -607,10 +685,10 @@ mod tests { CompactionPickerTestCase { window_size: 3, input_files: [ - new_file_handle(file_ids[0], -2000, -3, 0), - new_file_handle(file_ids[1], -3000, -100, 0), - new_file_handle(file_ids[2], 0, 2999, 0), //active windows - new_file_handle(file_ids[3], 50, 2998, 0), //active windows + new_file_handle_with_sequence(file_ids[0], -2000, -3, 0, 1), + new_file_handle_with_sequence(file_ids[1], -3000, -100, 0, 2), + new_file_handle_with_sequence(file_ids[2], 0, 2999, 0, 3), //active windows + new_file_handle_with_sequence(file_ids[3], 50, 2998, 0, 4), //active windows ] .to_vec(), expected_outputs: vec![ @@ -636,11 +714,11 @@ mod tests { CompactionPickerTestCase { window_size: 3, input_files: [ - new_file_handle(file_ids[0], -2000, -3, 0), - new_file_handle(file_ids[1], -3000, -100, 0), - new_file_handle(file_ids[2], 0, 2999, 0), - new_file_handle(file_ids[3], 50, 2998, 0), - new_file_handle(file_ids[4], 11, 2990, 0), + new_file_handle_with_sequence(file_ids[0], -2000, -3, 0, 1), + new_file_handle_with_sequence(file_ids[1], -3000, -100, 0, 2), + new_file_handle_with_sequence(file_ids[2], 0, 2999, 0, 3), + new_file_handle_with_sequence(file_ids[3], 50, 2998, 0, 4), + new_file_handle_with_sequence(file_ids[4], 11, 2990, 0, 5), ] .to_vec(), expected_outputs: vec![ @@ -655,6 +733,27 @@ mod tests { ], } .check(); + + // Case 3: + // A compaction may split output into several files that have overlapping time ranges and same sequence, + // we should treat these files as one FileGroup. + let file_ids = (0..6).map(|_| FileId::random()).collect::>(); + CompactionPickerTestCase { + window_size: 3, + input_files: [ + new_file_handle_with_sequence(file_ids[0], 0, 2999, 1, 1), + new_file_handle_with_sequence(file_ids[1], 0, 2998, 1, 1), + new_file_handle_with_sequence(file_ids[2], 3000, 5999, 1, 2), + new_file_handle_with_sequence(file_ids[3], 3000, 5000, 1, 2), + new_file_handle_with_sequence(file_ids[4], 11, 2990, 0, 3), + ] + .to_vec(), + expected_outputs: vec![ExpectedOutput { + input_files: vec![0, 1, 4], + output_level: 1, + }], + } + .check(); } // TODO(hl): TTL tester that checks if get_expired_ssts function works as expected. diff --git a/src/mito2/src/test_util/version_util.rs b/src/mito2/src/test_util/version_util.rs index 0d6d0e62c0..fa24250737 100644 --- a/src/mito2/src/test_util/version_util.rs +++ b/src/mito2/src/test_util/version_util.rs @@ -15,6 +15,7 @@ //! Utilities to mock version. use std::collections::HashMap; +use std::num::NonZeroU64; use std::sync::Arc; use api::v1::value::ValueData; @@ -103,7 +104,7 @@ impl VersionControlBuilder { index_file_size: 0, num_rows: 0, num_row_groups: 0, - sequence: None, + sequence: NonZeroU64::new(start_ms as u64), }, ); self @@ -196,7 +197,7 @@ pub(crate) fn apply_edit( index_file_size: 0, num_rows: 0, num_row_groups: 0, - sequence: None, + sequence: NonZeroU64::new(*start_ms as u64), } }) .collect();