diff --git a/src/mito2/src/cache/index.rs b/src/mito2/src/cache/index.rs index 5cb7770e62..97c4bc9cf9 100644 --- a/src/mito2/src/cache/index.rs +++ b/src/mito2/src/cache/index.rs @@ -212,6 +212,8 @@ where } fn put_page(&self, key: K, page_key: PageKey, value: Bytes) { + // Clones the value to ensure it doesn't reference a larger buffer. + let value = Bytes::from(value.to_vec()); CACHE_BYTES .with_label_values(&[INDEX_CONTENT_TYPE]) .add((self.weight_of_content)(&(key, page_key), &value).into()); diff --git a/src/mito2/src/cache/index/bloom_filter_index.rs b/src/mito2/src/cache/index/bloom_filter_index.rs index ef8f8aecf2..caa3bd8435 100644 --- a/src/mito2/src/cache/index/bloom_filter_index.rs +++ b/src/mito2/src/cache/index/bloom_filter_index.rs @@ -15,7 +15,7 @@ use std::ops::Range; use std::sync::Arc; -use api::v1::index::BloomFilterMeta; +use api::v1::index::{BloomFilterLoc, BloomFilterMeta}; use async_trait::async_trait; use bytes::Bytes; use index::bloom_filter::error::Result; @@ -56,11 +56,17 @@ impl BloomFilterIndexCache { /// Calculates weight for bloom filter index metadata. fn bloom_filter_index_metadata_weight( k: &(FileId, ColumnId, Tag), - _: &Arc, + meta: &Arc, ) -> u32 { - (k.0.as_bytes().len() + let base = k.0.as_bytes().len() + std::mem::size_of::() - + std::mem::size_of::()) as u32 + + std::mem::size_of::() + + std::mem::size_of::(); + + let vec_estimated = meta.segment_loc_indices.len() * std::mem::size_of::() + + meta.bloom_filter_locs.len() * std::mem::size_of::(); + + (base + vec_estimated) as u32 } /// Calculates weight for bloom filter index content. @@ -167,6 +173,45 @@ mod test { const FUZZ_REPEAT_TIMES: usize = 100; + #[test] + fn bloom_filter_metadata_weight_counts_vec_contents() { + let file_id = FileId::parse_str("00000000-0000-0000-0000-000000000001").unwrap(); + let column_id: ColumnId = 42; + let tag = Tag::Skipping; + + let meta = BloomFilterMeta { + rows_per_segment: 128, + segment_count: 2, + row_count: 256, + bloom_filter_size: 1024, + segment_loc_indices: vec![0, 64, 128, 192], + bloom_filter_locs: vec![ + BloomFilterLoc { + offset: 0, + size: 512, + element_count: 1000, + }, + BloomFilterLoc { + offset: 512, + size: 512, + element_count: 1000, + }, + ], + }; + + let weight = + bloom_filter_index_metadata_weight(&(file_id, column_id, tag), &Arc::new(meta.clone())); + + let base = file_id.as_bytes().len() + + std::mem::size_of::() + + std::mem::size_of::() + + std::mem::size_of::(); + let expected_dynamic = meta.segment_loc_indices.len() * std::mem::size_of::() + + meta.bloom_filter_locs.len() * std::mem::size_of::(); + + assert_eq!(weight as usize, base + expected_dynamic); + } + #[test] fn fuzz_index_calculation() { let mut rng = rand::rng(); diff --git a/src/mito2/src/compaction/run.rs b/src/mito2/src/compaction/run.rs index c148f57585..4942cd1035 100644 --- a/src/mito2/src/compaction/run.rs +++ b/src/mito2/src/compaction/run.rs @@ -179,10 +179,6 @@ impl FileGroup { pub(crate) fn into_files(self) -> impl Iterator { self.files.into_iter() } - - pub(crate) fn is_all_level_0(&self) -> bool { - self.files.iter().all(|f| f.level() == 0) - } } impl Ranged for FileGroup { diff --git a/src/mito2/src/compaction/twcs.rs b/src/mito2/src/compaction/twcs.rs index f950e45df2..a100c9d02e 100644 --- a/src/mito2/src/compaction/twcs.rs +++ b/src/mito2/src/compaction/twcs.rs @@ -89,7 +89,7 @@ impl TwcsPicker { { let (kept_files, ignored_files) = files_to_merge .into_iter() - .partition(|fg| fg.size() <= max_size as usize && fg.is_all_level_0()); + .partition(|fg| fg.size() <= max_size as usize); files_to_merge = kept_files; info!( "Compaction for {} skipped {} large files in append mode for region {}, window {}, max_size: {}", @@ -122,11 +122,13 @@ impl TwcsPicker { }; let total_input_files: usize = inputs.iter().map(|g| g.num_files()).sum(); if total_input_files > max_input_file_num { + // Sorts file groups by size first. + inputs.sort_unstable_by_key(|fg| fg.size()); let mut num_picked_files = 0; inputs = inputs .into_iter() - .take_while(|g| { - let current_group_file_num = g.num_files(); + .take_while(|fg| { + let current_group_file_num = fg.num_files(); if current_group_file_num + num_picked_files <= max_input_file_num { num_picked_files += current_group_file_num; true @@ -141,6 +143,7 @@ impl TwcsPicker { ); } if inputs.len() > 1 { + // If we have more than one group to compact. log_pick_result( region_id, *window, @@ -1048,5 +1051,85 @@ mod tests { assert!(!output.is_empty(), "Should have at least one output"); } + #[test] + fn test_pick_multiple_runs() { + common_telemetry::init_default_ut_logging(); + + let num_files = 8; + let file_ids = (0..num_files).map(|_| FileId::random()).collect::>(); + + // Create files with different sequences so they form multiple runs + let files: Vec<_> = file_ids + .iter() + .enumerate() + .map(|(idx, file_id)| { + new_file_handle_with_size_and_sequence( + *file_id, + 0, + 999, + 0, + (idx + 1) as u64, + 1024 * 1024, + ) + }) + .collect(); + + let mut windows = assign_to_windows(files.iter(), 3); + + let picker = TwcsPicker { + trigger_file_num: 4, + time_window_seconds: Some(3), + max_output_file_size: None, + append_mode: false, + max_background_tasks: None, + }; + + let active_window = find_latest_window_in_seconds(files.iter(), 3); + let output = picker.build_output(RegionId::from_u64(123), &mut windows, active_window); + + assert_eq!(1, output.len()); + assert_eq!(output[0].inputs.len(), 2); + } + + #[test] + fn test_limit_max_input_files() { + common_telemetry::init_default_ut_logging(); + + let num_files = 50; + let file_ids = (0..num_files).map(|_| FileId::random()).collect::>(); + + // Create files with different sequences so they form 2 runs + let files: Vec<_> = file_ids + .iter() + .enumerate() + .map(|(idx, file_id)| { + new_file_handle_with_size_and_sequence( + *file_id, + (idx / 2 * 10) as i64, + (idx / 2 * 10 + 5) as i64, + 0, + (idx + 1) as u64, + 1024 * 1024, + ) + }) + .collect(); + + let mut windows = assign_to_windows(files.iter(), 3); + + let picker = TwcsPicker { + trigger_file_num: 4, + time_window_seconds: Some(3), + max_output_file_size: None, + append_mode: false, + max_background_tasks: None, + }; + + let active_window = find_latest_window_in_seconds(files.iter(), 3); + let output = picker.build_output(RegionId::from_u64(123), &mut windows, active_window); + + assert_eq!(1, output.len()); + assert_eq!(output[0].inputs.len(), 32); + } + // TODO(hl): TTL tester that checks if get_expired_ssts function works as expected. }