fix(mito): avoid shortcut in picking multi window files (#7174)

* fix/pick-continue:
 ### Add Tests for TWCS Compaction Logic

 - **`twcs.rs`**:
   - Modified the logic in `TwcsPicker` to handle cases with zero runs by using `continue` instead of `return`.
   - Added two new test cases: `test_build_output_multiple_windows_with_zero_runs` and `test_build_output_single_window_zero_runs` to verify the behavior of the compaction logic when there are zero runs in
 the windows.

 - **`memtable_util.rs`**:
   - Removed unused import `PredicateGroup`.

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>

* fix: clippy

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>

* fix/pick-continue:
 ### Commit Message

 Enhance Compaction Process with Expired SST Handling and Testing

 - **`compactor.rs`**:
   - Introduced handling for expired SSTs by updating the manifest immediately upon task completion.
   - Added new test cases to verify the handling of expired SSTs and manifest updates.

 - **`task.rs`**:
   - Implemented `remove_expired` function to handle expired SSTs by updating the manifest and notifying the region worker loop.
   - Refactored `handle_compaction` to `handle_expiration_and_compaction` to integrate expired SST removal before merging inputs.
   - Added logging and error handling for expired SST removal process.

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>

* refactor/progressive-compaction:
 **Enhance Compaction Task Error Handling**

 - Updated `task.rs` to conditionally execute the removal of expired SST files only when they exist, improving error handling and performance.
 - Added a check for non-empty `expired_ssts` before initiating the removal process, ensuring unnecessary operations are avoided.

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>

* refactor/progressive-compaction:
 ### Refactor `DefaultCompactor` to Extract `merge_single_output` Method

 - **File**: `src/mito2/src/compaction/compactor.rs`
   - Extracted the logic for merging a single compaction output into SST files into a new method `merge_single_output` within the `DefaultCompactor` struct.
   - Simplified the `merge_ssts` method by utilizing the new `merge_single_output` method, reducing code duplication and improving maintainability.

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>

* refactor/progressive-compaction:
 ### Add Max Background Compaction Tasks Configuration

 - **`compaction.rs`**: Added `max_background_compactions` to the compaction scheduler to limit background tasks.
 - **`compaction/compactor.rs`**: Removed immediate manifest update logic after task completion.
 - **`compaction/picker.rs`**: Introduced `max_background_tasks` parameter in `new_picker` to control task limits.
 - **`compaction/twcs.rs`**: Updated `TwcsPicker` to include `max_background_tasks` and truncate inputs exceeding this limit. Added related test cases to ensure functionality.

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>

* fix/pick-continue:
 ### Improve Error Handling and Task Management in Compaction

 - **`task.rs`**: Enhanced error handling in `remove_expired` function by logging errors without halting the compaction process. Removed the return of `Result` type and added detailed logging for various
 failure scenarios.
 - **`twcs.rs`**: Adjusted task management logic by removing input truncation based on `max_background_tasks` and instead discarding remaining tasks if the output size exceeds the limit. This ensures better
 control over task execution and resource management.

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>

* fix/pick-continue:
 ### Add Unit Tests for Compaction Task and TWCS Picker

 - **`task.rs`**: Added unit tests to verify the behavior of `PickerOutput` with and without expired SSTs.
 - **`twcs.rs`**: Introduced tests for `TwcsPicker` to ensure correct handling of `max_background_tasks` during compaction, including scenarios with and without task truncation.

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>

* fix/pick-continue:
 **Improve Error Handling and Notification in Compaction Task**

 - **File:** `task.rs`
   - Changed log level from `warn` to `error` for manifest update failures to enhance error visibility.
   - Refactored the notification mechanism for expired file removal by using `BackgroundNotify::RegionEdit` with `RegionEditResult` to streamline the process.
   - Simplified error handling by consolidating match cases into a single `if let Err` block for better readability and maintainability.

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>

---------

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>
This commit is contained in:
Lei, HUANG
2025-11-06 14:27:17 +08:00
committed by GitHub
parent 82812ff19e
commit c2ff563ac6
6 changed files with 488 additions and 129 deletions

View File

@@ -305,6 +305,7 @@ impl CompactionScheduler {
&options,
&request.current_version.options.compaction,
request.current_version.options.append_mode,
Some(self.engine_config.max_background_compactions),
);
let region_id = request.region_id();
let CompactionRequest {

View File

@@ -35,7 +35,7 @@ use crate::access_layer::{
};
use crate::cache::{CacheManager, CacheManagerRef};
use crate::compaction::picker::{PickerOutput, new_picker};
use crate::compaction::{CompactionSstReaderBuilder, find_ttl};
use crate::compaction::{CompactionOutput, CompactionSstReaderBuilder, find_ttl};
use crate::config::MitoConfig;
use crate::error::{
EmptyRegionDirSnafu, InvalidPartitionExprSnafu, JoinSnafu, ObjectStoreNotFoundSnafu, Result,
@@ -313,6 +313,126 @@ pub trait Compactor: Send + Sync + 'static {
/// DefaultCompactor is the default implementation of Compactor.
pub struct DefaultCompactor;
impl DefaultCompactor {
/// Merge a single compaction output into SST files.
async fn merge_single_output(
compaction_region: CompactionRegion,
output: CompactionOutput,
write_opts: WriteOptions,
) -> Result<Vec<FileMeta>> {
let region_id = compaction_region.region_id;
let storage = compaction_region.region_options.storage.clone();
let index_options = compaction_region
.current_version
.options
.index_options
.clone();
let append_mode = compaction_region.current_version.options.append_mode;
let merge_mode = compaction_region.current_version.options.merge_mode();
let flat_format = compaction_region
.region_options
.sst_format
.map(|format| format == FormatType::Flat)
.unwrap_or(
compaction_region
.engine_config
.default_experimental_flat_format,
);
let index_config = compaction_region.engine_config.index.clone();
let inverted_index_config = compaction_region.engine_config.inverted_index.clone();
let fulltext_index_config = compaction_region.engine_config.fulltext_index.clone();
let bloom_filter_index_config = compaction_region.engine_config.bloom_filter_index.clone();
let input_file_names = output
.inputs
.iter()
.map(|f| f.file_id().to_string())
.join(",");
let max_sequence = output
.inputs
.iter()
.map(|f| f.meta_ref().sequence)
.max()
.flatten();
let builder = CompactionSstReaderBuilder {
metadata: compaction_region.region_metadata.clone(),
sst_layer: compaction_region.access_layer.clone(),
cache: compaction_region.cache_manager.clone(),
inputs: &output.inputs,
append_mode,
filter_deleted: output.filter_deleted,
time_range: output.output_time_range,
merge_mode,
};
let source = if flat_format {
let reader = builder.build_flat_sst_reader().await?;
Either::Right(FlatSource::Stream(reader))
} else {
let reader = builder.build_sst_reader().await?;
Either::Left(Source::Reader(reader))
};
let mut metrics = Metrics::new(WriteType::Compaction);
let region_metadata = compaction_region.region_metadata.clone();
let sst_infos = compaction_region
.access_layer
.write_sst(
SstWriteRequest {
op_type: OperationType::Compact,
metadata: region_metadata.clone(),
source,
cache_manager: compaction_region.cache_manager.clone(),
storage,
max_sequence: max_sequence.map(NonZero::get),
index_options,
index_config,
inverted_index_config,
fulltext_index_config,
bloom_filter_index_config,
},
&write_opts,
&mut metrics,
)
.await?;
// Convert partition expression once outside the map
let partition_expr = match &region_metadata.partition_expr {
None => None,
Some(json_str) if json_str.is_empty() => None,
Some(json_str) => PartitionExpr::from_json_str(json_str).with_context(|_| {
InvalidPartitionExprSnafu {
expr: json_str.clone(),
}
})?,
};
let output_files = sst_infos
.into_iter()
.map(|sst_info| FileMeta {
region_id,
file_id: sst_info.file_id,
time_range: sst_info.time_range,
level: output.output_level,
file_size: sst_info.file_size,
available_indexes: sst_info.index_metadata.build_available_indexes(),
index_file_size: sst_info.index_metadata.file_size,
index_file_id: None,
num_rows: sst_info.num_rows as u64,
num_row_groups: sst_info.num_row_groups,
sequence: max_sequence,
partition_expr: partition_expr.clone(),
num_series: sst_info.num_series,
})
.collect::<Vec<_>>();
let output_file_names = output_files.iter().map(|f| f.file_id.to_string()).join(",");
info!(
"Region {} compaction inputs: [{}], outputs: [{}], flat_format: {}, metrics: {:?}",
region_id, input_file_names, output_file_names, flat_format, metrics
);
metrics.observe();
Ok(output_files)
}
}
#[async_trait::async_trait]
impl Compactor for DefaultCompactor {
async fn merge_ssts(
@@ -324,131 +444,22 @@ impl Compactor for DefaultCompactor {
let mut compacted_inputs =
Vec::with_capacity(picker_output.outputs.iter().map(|o| o.inputs.len()).sum());
let internal_parallelism = compaction_region.max_parallelism.max(1);
let compaction_time_window = picker_output.time_window_size;
for output in picker_output.outputs.drain(..) {
compacted_inputs.extend(output.inputs.iter().map(|f| f.meta_ref().clone()));
let inputs_to_remove: Vec<_> =
output.inputs.iter().map(|f| f.meta_ref().clone()).collect();
compacted_inputs.extend(inputs_to_remove.iter().cloned());
let write_opts = WriteOptions {
write_buffer_size: compaction_region.engine_config.sst_write_buffer_size,
max_file_size: picker_output.max_file_size,
..Default::default()
};
let region_metadata = compaction_region.region_metadata.clone();
let sst_layer = compaction_region.access_layer.clone();
let region_id = compaction_region.region_id;
let cache_manager = compaction_region.cache_manager.clone();
let storage = compaction_region.region_options.storage.clone();
let index_options = compaction_region
.current_version
.options
.index_options
.clone();
let append_mode = compaction_region.current_version.options.append_mode;
let merge_mode = compaction_region.current_version.options.merge_mode();
let flat_format = compaction_region
.region_options
.sst_format
.map(|format| format == FormatType::Flat)
.unwrap_or(
compaction_region
.engine_config
.default_experimental_flat_format,
);
let index_config = compaction_region.engine_config.index.clone();
let inverted_index_config = compaction_region.engine_config.inverted_index.clone();
let fulltext_index_config = compaction_region.engine_config.fulltext_index.clone();
let bloom_filter_index_config =
compaction_region.engine_config.bloom_filter_index.clone();
let max_sequence = output
.inputs
.iter()
.map(|f| f.meta_ref().sequence)
.max()
.flatten();
let region_metadata_for_filemeta = region_metadata.clone();
futs.push(async move {
let input_file_names = output
.inputs
.iter()
.map(|f| f.file_id().to_string())
.join(",");
let builder = CompactionSstReaderBuilder {
metadata: region_metadata.clone(),
sst_layer: sst_layer.clone(),
cache: cache_manager.clone(),
inputs: &output.inputs,
append_mode,
filter_deleted: output.filter_deleted,
time_range: output.output_time_range,
merge_mode,
};
let source = if flat_format {
let reader = builder.build_flat_sst_reader().await?;
either::Right(FlatSource::Stream(reader))
} else {
let reader = builder.build_sst_reader().await?;
either::Left(Source::Reader(reader))
};
let mut metrics = Metrics::new(WriteType::Compaction);
let sst_infos = sst_layer
.write_sst(
SstWriteRequest {
op_type: OperationType::Compact,
metadata: region_metadata,
source,
cache_manager,
storage,
max_sequence: max_sequence.map(NonZero::get),
index_options,
index_config,
inverted_index_config,
fulltext_index_config,
bloom_filter_index_config,
},
&write_opts,
&mut metrics,
)
.await?;
// Convert partition expression once outside the map
let partition_expr = match &region_metadata_for_filemeta.partition_expr {
None => None,
Some(json_str) if json_str.is_empty() => None,
Some(json_str) => {
PartitionExpr::from_json_str(json_str).with_context(|_| {
InvalidPartitionExprSnafu {
expr: json_str.clone(),
}
})?
}
};
let output_files = sst_infos
.into_iter()
.map(|sst_info| FileMeta {
region_id,
file_id: sst_info.file_id,
time_range: sst_info.time_range,
level: output.output_level,
file_size: sst_info.file_size,
available_indexes: sst_info.index_metadata.build_available_indexes(),
index_file_size: sst_info.index_metadata.file_size,
index_file_id: None,
num_rows: sst_info.num_rows as u64,
num_row_groups: sst_info.num_row_groups,
sequence: max_sequence,
partition_expr: partition_expr.clone(),
num_series: sst_info.num_series,
})
.collect::<Vec<_>>();
let output_file_names =
output_files.iter().map(|f| f.file_id.to_string()).join(",");
info!(
"Region {} compaction inputs: [{}], outputs: [{}], flat_format: {}, metrics: {:?}",
region_id, input_file_names, output_file_names, flat_format, metrics
);
metrics.observe();
Ok(output_files)
});
futs.push(Self::merge_single_output(
compaction_region.clone(),
output,
write_opts,
));
}
let mut output_files = Vec::with_capacity(futs.len());
while !futs.is_empty() {
@@ -466,6 +477,8 @@ impl Compactor for DefaultCompactor {
output_files.extend(metas.into_iter().flatten());
}
// In case of remote compaction, we still allow the region edit after merge to
// clean expired ssts.
let mut inputs: Vec<_> = compacted_inputs.into_iter().collect();
inputs.extend(
picker_output
@@ -477,7 +490,7 @@ impl Compactor for DefaultCompactor {
Ok(MergeOutput {
files_to_add: output_files,
files_to_remove: inputs,
compaction_time_window: Some(picker_output.time_window_size),
compaction_time_window: Some(compaction_time_window),
})
}
@@ -522,6 +535,7 @@ impl Compactor for DefaultCompactor {
&compact_request_options,
&compaction_region.region_options.compaction,
compaction_region.region_options.append_mode,
None,
)
.pick(compaction_region);

View File

@@ -125,6 +125,7 @@ pub fn new_picker(
compact_request_options: &compact_request::Options,
compaction_options: &CompactionOptions,
append_mode: bool,
max_background_tasks: Option<usize>,
) -> Arc<dyn Picker> {
if let compact_request::Options::StrictWindow(window) = compact_request_options {
let window = if window.window_seconds == 0 {
@@ -140,6 +141,7 @@ pub fn new_picker(
time_window_seconds: twcs_opts.time_window_seconds(),
max_output_file_size: twcs_opts.max_output_file_size.map(|r| r.as_bytes()),
append_mode,
max_background_tasks,
}) as Arc<_>,
}
}

View File

@@ -16,19 +16,22 @@ use std::fmt::{Debug, Formatter};
use std::sync::Arc;
use std::time::Instant;
use common_telemetry::{error, info};
use common_telemetry::{error, info, warn};
use itertools::Itertools;
use snafu::ResultExt;
use tokio::sync::mpsc;
use crate::compaction::compactor::{CompactionRegion, Compactor};
use crate::compaction::picker::{CompactionTask, PickerOutput};
use crate::error::CompactRegionSnafu;
use crate::manifest::action::RegionEdit;
use crate::manifest::action::{RegionEdit, RegionMetaAction, RegionMetaActionList};
use crate::metrics::{COMPACTION_FAILURE_COUNT, COMPACTION_STAGE_ELAPSED};
use crate::region::RegionLeaderState;
use crate::request::{
BackgroundNotify, CompactionFailed, CompactionFinished, OutputTx, WorkerRequest,
WorkerRequestWithTime,
BackgroundNotify, CompactionFailed, CompactionFinished, OutputTx, RegionEditResult,
WorkerRequest, WorkerRequestWithTime,
};
use crate::sst::file::FileMeta;
use crate::worker::WorkerListener;
use crate::{error, metrics};
@@ -78,9 +81,93 @@ impl CompactionTaskImpl {
.for_each(|o| o.inputs.iter().for_each(|f| f.set_compacting(compacting)));
}
async fn handle_compaction(&mut self) -> error::Result<RegionEdit> {
/// Remove expired ssts files, update manifest immediately
/// and apply the edit to region version.
///
/// This function logs errors but does not stop the compaction process if removal fails.
async fn remove_expired(
&self,
compaction_region: &CompactionRegion,
expired_files: Vec<FileMeta>,
) {
let region_id = compaction_region.region_id;
let expired_files_str = expired_files.iter().map(|f| f.file_id).join(",");
let (expire_delete_sender, expire_delete_listener) = tokio::sync::oneshot::channel();
// Update manifest to remove expired SSTs
let edit = RegionEdit {
files_to_add: Vec::new(),
files_to_remove: expired_files,
timestamp_ms: Some(chrono::Utc::now().timestamp_millis()),
compaction_time_window: None,
flushed_entry_id: None,
flushed_sequence: None,
committed_sequence: None,
};
// 1. Update manifest
let action_list = RegionMetaActionList::with_action(RegionMetaAction::Edit(edit.clone()));
if let Err(e) = compaction_region
.manifest_ctx
.update_manifest(RegionLeaderState::Writable, action_list)
.await
{
error!(
e;
"Failed to update manifest for expired files removal, region: {region_id}, files: [{expired_files_str}]. Compaction will continue."
);
return;
}
// 2. Notify region worker loop to remove expired files from region version.
self.send_to_worker(WorkerRequest::Background {
region_id,
notify: BackgroundNotify::RegionEdit(RegionEditResult {
region_id,
sender: expire_delete_sender,
edit,
result: Ok(()),
}),
})
.await;
if let Err(e) = expire_delete_listener
.await
.context(error::RecvSnafu)
.flatten()
{
warn!(
e;
"Failed to remove expired files from region version, region: {region_id}, files: [{expired_files_str}]. Compaction will continue."
);
return;
}
info!(
"Successfully removed expired files, region: {region_id}, files: [{expired_files_str}]"
);
}
async fn handle_expiration_and_compaction(&mut self) -> error::Result<RegionEdit> {
self.mark_files_compacting(true);
// 1. In case of local compaction, we can delete expired ssts in advance.
if !self.picker_output.expired_ssts.is_empty() {
let remove_timer = COMPACTION_STAGE_ELAPSED
.with_label_values(&["remove_expired"])
.start_timer();
let expired_ssts = self
.picker_output
.expired_ssts
.drain(..)
.map(|f| f.meta_ref().clone())
.collect();
// remove_expired logs errors but doesn't stop compaction
self.remove_expired(&self.compaction_region, expired_ssts)
.await;
remove_timer.observe_duration();
}
// 2. Merge inputs
let merge_timer = COMPACTION_STAGE_ELAPSED
.with_label_values(&["merge"])
.start_timer();
@@ -152,7 +239,7 @@ impl CompactionTaskImpl {
#[async_trait::async_trait]
impl CompactionTask for CompactionTaskImpl {
async fn run(&mut self) {
let notify = match self.handle_compaction().await {
let notify = match self.handle_expiration_and_compaction().await {
Ok(edit) => BackgroundNotify::CompactionFinished(CompactionFinished {
region_id: self.compaction_region.region_id,
senders: std::mem::take(&mut self.waiters),
@@ -178,3 +265,66 @@ impl CompactionTask for CompactionTaskImpl {
.await;
}
}
#[cfg(test)]
mod tests {
use store_api::storage::FileId;
use crate::compaction::picker::PickerOutput;
use crate::compaction::test_util::new_file_handle;
#[test]
fn test_picker_output_with_expired_ssts() {
// Test that PickerOutput correctly includes expired_ssts
// This verifies that expired SSTs are properly identified and included
// in the picker output, which is then handled by handle_expiration_and_compaction
let file_ids = (0..3).map(|_| FileId::random()).collect::<Vec<_>>();
let expired_ssts = vec![
new_file_handle(file_ids[0], 0, 999, 0),
new_file_handle(file_ids[1], 1000, 1999, 0),
];
let picker_output = PickerOutput {
outputs: vec![],
expired_ssts: expired_ssts.clone(),
time_window_size: 3600,
max_file_size: None,
};
// Verify expired_ssts are included
assert_eq!(picker_output.expired_ssts.len(), 2);
assert_eq!(
picker_output.expired_ssts[0].file_id(),
expired_ssts[0].file_id()
);
assert_eq!(
picker_output.expired_ssts[1].file_id(),
expired_ssts[1].file_id()
);
}
#[test]
fn test_picker_output_without_expired_ssts() {
// Test that PickerOutput works correctly when there are no expired SSTs
let picker_output = PickerOutput {
outputs: vec![],
expired_ssts: vec![],
time_window_size: 3600,
max_file_size: None,
};
// Verify empty expired_ssts
assert!(picker_output.expired_ssts.is_empty());
}
// Note: Testing remove_expired() directly requires extensive mocking of:
// - manifest_ctx (ManifestContext)
// - request_sender (mpsc::Sender<WorkerRequestWithTime>)
// - WorkerRequest handling
//
// The behavior is tested indirectly through integration tests:
// - remove_expired() logs errors but doesn't stop compaction
// - handle_expiration_and_compaction() continues even if remove_expired() encounters errors
// - The function is designed to be non-blocking for compaction
}

View File

@@ -18,7 +18,7 @@ use std::fmt::Debug;
use std::num::NonZeroU64;
use common_base::readable_size::ReadableSize;
use common_telemetry::info;
use common_telemetry::{debug, info};
use common_time::Timestamp;
use common_time::timestamp::TimeUnit;
use common_time::timestamp_millis::BucketAligned;
@@ -48,6 +48,8 @@ pub struct TwcsPicker {
pub max_output_file_size: Option<u64>,
/// Whether the target region is in append mode.
pub append_mode: bool,
/// Max background compaction tasks.
pub max_background_tasks: Option<usize>,
}
impl TwcsPicker {
@@ -88,7 +90,7 @@ impl TwcsPicker {
// because after compaction there will be no overlapping files.
let filter_deleted = !files.overlapping && found_runs <= 2 && !self.append_mode;
if found_runs == 0 {
return output;
continue;
}
let inputs = if found_runs > 1 {
@@ -119,6 +121,16 @@ impl TwcsPicker {
filter_deleted,
output_time_range: None, // we do not enforce output time range in twcs compactions.
});
if let Some(max_background_tasks) = self.max_background_tasks
&& output.len() >= max_background_tasks
{
debug!(
"Region ({:?}) compaction task size larger than max background tasks({}), remaining tasks discarded",
region_id, max_background_tasks
);
break;
}
}
}
output
@@ -680,6 +692,7 @@ mod tests {
time_window_seconds: None,
max_output_file_size: None,
append_mode: false,
max_background_tasks: None,
}
.build_output(RegionId::from_u64(0), &mut windows, active_window);
@@ -831,5 +844,185 @@ mod tests {
}
}
#[test]
fn test_build_output_multiple_windows_with_zero_runs() {
let file_ids = (0..6).map(|_| FileId::random()).collect::<Vec<_>>();
let files = [
// Window 0: Contains 3 files but not forming any runs (not enough files in sequence to reach trigger_file_num)
new_file_handle_with_sequence(file_ids[0], 0, 999, 0, 1),
new_file_handle_with_sequence(file_ids[1], 0, 999, 0, 2),
new_file_handle_with_sequence(file_ids[2], 0, 999, 0, 3),
// Window 3: Contains files that will form 2 runs
new_file_handle_with_sequence(file_ids[3], 3000, 3999, 0, 4),
new_file_handle_with_sequence(file_ids[4], 3000, 3999, 0, 5),
new_file_handle_with_sequence(file_ids[5], 3000, 3999, 0, 6),
];
let mut windows = assign_to_windows(files.iter(), 3);
// Create picker with trigger_file_num of 4 so single files won't form runs in first window
let picker = TwcsPicker {
trigger_file_num: 4, // High enough to prevent runs in first window
time_window_seconds: Some(3),
max_output_file_size: None,
append_mode: false,
max_background_tasks: None,
};
let active_window = find_latest_window_in_seconds(files.iter(), 3);
let output = picker.build_output(RegionId::from_u64(123), &mut windows, active_window);
assert!(
!output.is_empty(),
"Should have output from windows with runs, even when one window has 0 runs"
);
let all_output_files: Vec<_> = output
.iter()
.flat_map(|o| o.inputs.iter())
.map(|f| f.file_id().file_id())
.collect();
assert!(
all_output_files.contains(&file_ids[3])
|| all_output_files.contains(&file_ids[4])
|| all_output_files.contains(&file_ids[5]),
"Output should contain files from the window with runs"
);
}
#[test]
fn test_build_output_single_window_zero_runs() {
let file_ids = (0..2).map(|_| FileId::random()).collect::<Vec<_>>();
let large_file_1 = new_file_handle_with_size_and_sequence(file_ids[0], 0, 999, 0, 1, 2000); // 2000 bytes
let large_file_2 = new_file_handle_with_size_and_sequence(file_ids[1], 0, 999, 0, 2, 2500); // 2500 bytes
let files = [large_file_1, large_file_2];
let mut windows = assign_to_windows(files.iter(), 3);
let picker = TwcsPicker {
trigger_file_num: 2,
time_window_seconds: Some(3),
max_output_file_size: Some(1000),
append_mode: true,
max_background_tasks: None,
};
let active_window = find_latest_window_in_seconds(files.iter(), 3);
let output = picker.build_output(RegionId::from_u64(456), &mut windows, active_window);
// Should return empty output (no compaction needed)
assert!(
output.is_empty(),
"Should return empty output when no runs are found after filtering"
);
}
#[test]
fn test_max_background_tasks_truncation() {
let file_ids = (0..10).map(|_| FileId::random()).collect::<Vec<_>>();
let max_background_tasks = 3;
// Create files across multiple windows that will generate multiple compaction outputs
let files = [
// Window 0: 4 files that will form a run
new_file_handle_with_sequence(file_ids[0], 0, 999, 0, 1),
new_file_handle_with_sequence(file_ids[1], 0, 999, 0, 2),
new_file_handle_with_sequence(file_ids[2], 0, 999, 0, 3),
new_file_handle_with_sequence(file_ids[3], 0, 999, 0, 4),
// Window 3: 4 files that will form another run
new_file_handle_with_sequence(file_ids[4], 3000, 3999, 0, 5),
new_file_handle_with_sequence(file_ids[5], 3000, 3999, 0, 6),
new_file_handle_with_sequence(file_ids[6], 3000, 3999, 0, 7),
new_file_handle_with_sequence(file_ids[7], 3000, 3999, 0, 8),
// Window 6: 4 files that will form another run
new_file_handle_with_sequence(file_ids[8], 6000, 6999, 0, 9),
new_file_handle_with_sequence(file_ids[9], 6000, 6999, 0, 10),
];
let mut windows = assign_to_windows(files.iter(), 3);
let picker = TwcsPicker {
trigger_file_num: 4,
time_window_seconds: Some(3),
max_output_file_size: None,
append_mode: false,
max_background_tasks: Some(max_background_tasks),
};
let active_window = find_latest_window_in_seconds(files.iter(), 3);
let output = picker.build_output(RegionId::from_u64(123), &mut windows, active_window);
// Should have at most max_background_tasks outputs
assert!(
output.len() <= max_background_tasks,
"Output should be truncated to max_background_tasks: expected <= {}, got {}",
max_background_tasks,
output.len()
);
// Without max_background_tasks, should have more outputs
let picker_no_limit = TwcsPicker {
trigger_file_num: 4,
time_window_seconds: Some(3),
max_output_file_size: None,
append_mode: false,
max_background_tasks: None,
};
let mut windows_no_limit = assign_to_windows(files.iter(), 3);
let output_no_limit = picker_no_limit.build_output(
RegionId::from_u64(123),
&mut windows_no_limit,
active_window,
);
// Without limit, should have more outputs (if there are enough windows)
if output_no_limit.len() > max_background_tasks {
assert!(
output_no_limit.len() > output.len(),
"Without limit should have more outputs than with limit"
);
}
}
#[test]
fn test_max_background_tasks_no_truncation_when_under_limit() {
let file_ids = (0..4).map(|_| FileId::random()).collect::<Vec<_>>();
let max_background_tasks = 10; // Larger than expected outputs
// Create files in one window that will generate one compaction output
let files = [
new_file_handle_with_sequence(file_ids[0], 0, 999, 0, 1),
new_file_handle_with_sequence(file_ids[1], 0, 999, 0, 2),
new_file_handle_with_sequence(file_ids[2], 0, 999, 0, 3),
new_file_handle_with_sequence(file_ids[3], 0, 999, 0, 4),
];
let mut windows = assign_to_windows(files.iter(), 3);
let picker = TwcsPicker {
trigger_file_num: 4,
time_window_seconds: Some(3),
max_output_file_size: None,
append_mode: false,
max_background_tasks: Some(max_background_tasks),
};
let active_window = find_latest_window_in_seconds(files.iter(), 3);
let output = picker.build_output(RegionId::from_u64(123), &mut windows, active_window);
// Should have all outputs since we're under the limit
assert!(
output.len() <= max_background_tasks,
"Output should be within limit"
);
// Should have at least one output
assert!(!output.is_empty(), "Should have at least one output");
}
// TODO(hl): TTL tester that checks if get_expired_ssts function works as expected.
}

View File

@@ -40,7 +40,6 @@ use crate::memtable::{
BoxedBatchIterator, KeyValues, Memtable, MemtableBuilder, MemtableId, MemtableRanges,
MemtableRef, MemtableStats, RangesOptions,
};
use crate::read::scan_region::PredicateGroup;
/// Empty memtable for test.
#[derive(Debug, Default)]