chore: cherry pick #7157, #7229, #7239 to 0.15 branch (#7256)

* fix: cache estimate methods (#7157)

* fix: cache estimate methods

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* revert page value change

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* Apply suggestion from @evenyag

Co-authored-by: Yingwen <realevenyag@gmail.com>

* update test

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
Co-authored-by: Yingwen <realevenyag@gmail.com>
Signed-off-by: evenyag <realevenyag@gmail.com>

* fix: clone the page before putting into the index cache (#7229)

* fix: clone the page before putting into the index cache

Signed-off-by: evenyag <realevenyag@gmail.com>

* chore: fix warnings

Signed-off-by: evenyag <realevenyag@gmail.com>

---------

Signed-off-by: evenyag <realevenyag@gmail.com>

* fix: allow compacting L1 files under append mode (#7239)

* fix: allow compacting L1 files under append mode

Signed-off-by: evenyag <realevenyag@gmail.com>

* feat: limit the number of compaction input files

Signed-off-by: evenyag <realevenyag@gmail.com>

---------

Signed-off-by: evenyag <realevenyag@gmail.com>

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
Signed-off-by: evenyag <realevenyag@gmail.com>
Co-authored-by: Ruihang Xia <waynestxia@gmail.com>
This commit is contained in:
Yingwen
2025-11-19 14:41:35 +08:00
committed by GitHub
parent b08bdcb465
commit 50eaa3c80a
4 changed files with 137 additions and 11 deletions

View File

@@ -212,6 +212,8 @@ where
}
fn put_page(&self, key: K, page_key: PageKey, value: Bytes) {
// Clones the value to ensure it doesn't reference a larger buffer.
let value = Bytes::from(value.to_vec());
CACHE_BYTES
.with_label_values(&[INDEX_CONTENT_TYPE])
.add((self.weight_of_content)(&(key, page_key), &value).into());

View File

@@ -15,7 +15,7 @@
use std::ops::Range;
use std::sync::Arc;
use api::v1::index::BloomFilterMeta;
use api::v1::index::{BloomFilterLoc, BloomFilterMeta};
use async_trait::async_trait;
use bytes::Bytes;
use index::bloom_filter::error::Result;
@@ -56,11 +56,17 @@ impl BloomFilterIndexCache {
/// Calculates weight for bloom filter index metadata.
fn bloom_filter_index_metadata_weight(
k: &(FileId, ColumnId, Tag),
_: &Arc<BloomFilterMeta>,
meta: &Arc<BloomFilterMeta>,
) -> u32 {
(k.0.as_bytes().len()
let base = k.0.as_bytes().len()
+ std::mem::size_of::<ColumnId>()
+ std::mem::size_of::<BloomFilterMeta>()) as u32
+ std::mem::size_of::<Tag>()
+ std::mem::size_of::<BloomFilterMeta>();
let vec_estimated = meta.segment_loc_indices.len() * std::mem::size_of::<u64>()
+ meta.bloom_filter_locs.len() * std::mem::size_of::<BloomFilterLoc>();
(base + vec_estimated) as u32
}
/// Calculates weight for bloom filter index content.
@@ -167,6 +173,45 @@ mod test {
const FUZZ_REPEAT_TIMES: usize = 100;
#[test]
fn bloom_filter_metadata_weight_counts_vec_contents() {
let file_id = FileId::parse_str("00000000-0000-0000-0000-000000000001").unwrap();
let column_id: ColumnId = 42;
let tag = Tag::Skipping;
let meta = BloomFilterMeta {
rows_per_segment: 128,
segment_count: 2,
row_count: 256,
bloom_filter_size: 1024,
segment_loc_indices: vec![0, 64, 128, 192],
bloom_filter_locs: vec![
BloomFilterLoc {
offset: 0,
size: 512,
element_count: 1000,
},
BloomFilterLoc {
offset: 512,
size: 512,
element_count: 1000,
},
],
};
let weight =
bloom_filter_index_metadata_weight(&(file_id, column_id, tag), &Arc::new(meta.clone()));
let base = file_id.as_bytes().len()
+ std::mem::size_of::<ColumnId>()
+ std::mem::size_of::<Tag>()
+ std::mem::size_of::<BloomFilterMeta>();
let expected_dynamic = meta.segment_loc_indices.len() * std::mem::size_of::<u64>()
+ meta.bloom_filter_locs.len() * std::mem::size_of::<BloomFilterLoc>();
assert_eq!(weight as usize, base + expected_dynamic);
}
#[test]
fn fuzz_index_calculation() {
let mut rng = rand::rng();

View File

@@ -179,10 +179,6 @@ impl FileGroup {
pub(crate) fn into_files(self) -> impl Iterator<Item = FileHandle> {
self.files.into_iter()
}
pub(crate) fn is_all_level_0(&self) -> bool {
self.files.iter().all(|f| f.level() == 0)
}
}
impl Ranged for FileGroup {

View File

@@ -89,7 +89,7 @@ impl TwcsPicker {
{
let (kept_files, ignored_files) = files_to_merge
.into_iter()
.partition(|fg| fg.size() <= max_size as usize && fg.is_all_level_0());
.partition(|fg| fg.size() <= max_size as usize);
files_to_merge = kept_files;
info!(
"Compaction for {} skipped {} large files in append mode for region {}, window {}, max_size: {}",
@@ -122,11 +122,13 @@ impl TwcsPicker {
};
let total_input_files: usize = inputs.iter().map(|g| g.num_files()).sum();
if total_input_files > max_input_file_num {
// Sorts file groups by size first.
inputs.sort_unstable_by_key(|fg| fg.size());
let mut num_picked_files = 0;
inputs = inputs
.into_iter()
.take_while(|g| {
let current_group_file_num = g.num_files();
.take_while(|fg| {
let current_group_file_num = fg.num_files();
if current_group_file_num + num_picked_files <= max_input_file_num {
num_picked_files += current_group_file_num;
true
@@ -141,6 +143,7 @@ impl TwcsPicker {
);
}
if inputs.len() > 1 {
// If we have more than one group to compact.
log_pick_result(
region_id,
*window,
@@ -1048,5 +1051,85 @@ mod tests {
assert!(!output.is_empty(), "Should have at least one output");
}
#[test]
fn test_pick_multiple_runs() {
common_telemetry::init_default_ut_logging();
let num_files = 8;
let file_ids = (0..num_files).map(|_| FileId::random()).collect::<Vec<_>>();
// Create files with different sequences so they form multiple runs
let files: Vec<_> = file_ids
.iter()
.enumerate()
.map(|(idx, file_id)| {
new_file_handle_with_size_and_sequence(
*file_id,
0,
999,
0,
(idx + 1) as u64,
1024 * 1024,
)
})
.collect();
let mut windows = assign_to_windows(files.iter(), 3);
let picker = TwcsPicker {
trigger_file_num: 4,
time_window_seconds: Some(3),
max_output_file_size: None,
append_mode: false,
max_background_tasks: None,
};
let active_window = find_latest_window_in_seconds(files.iter(), 3);
let output = picker.build_output(RegionId::from_u64(123), &mut windows, active_window);
assert_eq!(1, output.len());
assert_eq!(output[0].inputs.len(), 2);
}
#[test]
fn test_limit_max_input_files() {
common_telemetry::init_default_ut_logging();
let num_files = 50;
let file_ids = (0..num_files).map(|_| FileId::random()).collect::<Vec<_>>();
// Create files with different sequences so they form 2 runs
let files: Vec<_> = file_ids
.iter()
.enumerate()
.map(|(idx, file_id)| {
new_file_handle_with_size_and_sequence(
*file_id,
(idx / 2 * 10) as i64,
(idx / 2 * 10 + 5) as i64,
0,
(idx + 1) as u64,
1024 * 1024,
)
})
.collect();
let mut windows = assign_to_windows(files.iter(), 3);
let picker = TwcsPicker {
trigger_file_num: 4,
time_window_seconds: Some(3),
max_output_file_size: None,
append_mode: false,
max_background_tasks: None,
};
let active_window = find_latest_window_in_seconds(files.iter(), 3);
let output = picker.build_output(RegionId::from_u64(123), &mut windows, active_window);
assert_eq!(1, output.len());
assert_eq!(output[0].inputs.len(), 32);
}
// TODO(hl): TTL tester that checks if get_expired_ssts function works as expected.
}