From c2ff563ac65d0691d90e4f7b7959a9778f10f699 Mon Sep 17 00:00:00 2001 From: "Lei, HUANG" <6406592+v0y4g3r@users.noreply.github.com> Date: Thu, 6 Nov 2025 14:27:17 +0800 Subject: [PATCH] fix(mito): avoid shortcut in picking multi window files (#7174) * fix/pick-continue: ### Add Tests for TWCS Compaction Logic - **`twcs.rs`**: - Modified the logic in `TwcsPicker` to handle cases with zero runs by using `continue` instead of `return`. - Added two new test cases: `test_build_output_multiple_windows_with_zero_runs` and `test_build_output_single_window_zero_runs` to verify the behavior of the compaction logic when there are zero runs in the windows. - **`memtable_util.rs`**: - Removed unused import `PredicateGroup`. Signed-off-by: Lei, HUANG * fix: clippy Signed-off-by: Lei, HUANG * fix/pick-continue: ### Commit Message Enhance Compaction Process with Expired SST Handling and Testing - **`compactor.rs`**: - Introduced handling for expired SSTs by updating the manifest immediately upon task completion. - Added new test cases to verify the handling of expired SSTs and manifest updates. - **`task.rs`**: - Implemented `remove_expired` function to handle expired SSTs by updating the manifest and notifying the region worker loop. - Refactored `handle_compaction` to `handle_expiration_and_compaction` to integrate expired SST removal before merging inputs. - Added logging and error handling for expired SST removal process. Signed-off-by: Lei, HUANG * refactor/progressive-compaction: **Enhance Compaction Task Error Handling** - Updated `task.rs` to conditionally execute the removal of expired SST files only when they exist, improving error handling and performance. - Added a check for non-empty `expired_ssts` before initiating the removal process, ensuring unnecessary operations are avoided. Signed-off-by: Lei, HUANG * refactor/progressive-compaction: ### Refactor `DefaultCompactor` to Extract `merge_single_output` Method - **File**: `src/mito2/src/compaction/compactor.rs` - Extracted the logic for merging a single compaction output into SST files into a new method `merge_single_output` within the `DefaultCompactor` struct. - Simplified the `merge_ssts` method by utilizing the new `merge_single_output` method, reducing code duplication and improving maintainability. Signed-off-by: Lei, HUANG * refactor/progressive-compaction: ### Add Max Background Compaction Tasks Configuration - **`compaction.rs`**: Added `max_background_compactions` to the compaction scheduler to limit background tasks. - **`compaction/compactor.rs`**: Removed immediate manifest update logic after task completion. - **`compaction/picker.rs`**: Introduced `max_background_tasks` parameter in `new_picker` to control task limits. - **`compaction/twcs.rs`**: Updated `TwcsPicker` to include `max_background_tasks` and truncate inputs exceeding this limit. Added related test cases to ensure functionality. Signed-off-by: Lei, HUANG * fix/pick-continue: ### Improve Error Handling and Task Management in Compaction - **`task.rs`**: Enhanced error handling in `remove_expired` function by logging errors without halting the compaction process. Removed the return of `Result` type and added detailed logging for various failure scenarios. - **`twcs.rs`**: Adjusted task management logic by removing input truncation based on `max_background_tasks` and instead discarding remaining tasks if the output size exceeds the limit. This ensures better control over task execution and resource management. Signed-off-by: Lei, HUANG * fix/pick-continue: ### Add Unit Tests for Compaction Task and TWCS Picker - **`task.rs`**: Added unit tests to verify the behavior of `PickerOutput` with and without expired SSTs. - **`twcs.rs`**: Introduced tests for `TwcsPicker` to ensure correct handling of `max_background_tasks` during compaction, including scenarios with and without task truncation. Signed-off-by: Lei, HUANG * fix/pick-continue: **Improve Error Handling and Notification in Compaction Task** - **File:** `task.rs` - Changed log level from `warn` to `error` for manifest update failures to enhance error visibility. - Refactored the notification mechanism for expired file removal by using `BackgroundNotify::RegionEdit` with `RegionEditResult` to streamline the process. - Simplified error handling by consolidating match cases into a single `if let Err` block for better readability and maintainability. Signed-off-by: Lei, HUANG --------- Signed-off-by: Lei, HUANG --- src/mito2/src/compaction.rs | 1 + src/mito2/src/compaction/compactor.rs | 254 ++++++++++++----------- src/mito2/src/compaction/picker.rs | 2 + src/mito2/src/compaction/task.rs | 162 ++++++++++++++- src/mito2/src/compaction/twcs.rs | 197 +++++++++++++++++- src/mito2/src/test_util/memtable_util.rs | 1 - 6 files changed, 488 insertions(+), 129 deletions(-) diff --git a/src/mito2/src/compaction.rs b/src/mito2/src/compaction.rs index d83ed7ab7d..be4c12aa1b 100644 --- a/src/mito2/src/compaction.rs +++ b/src/mito2/src/compaction.rs @@ -305,6 +305,7 @@ impl CompactionScheduler { &options, &request.current_version.options.compaction, request.current_version.options.append_mode, + Some(self.engine_config.max_background_compactions), ); let region_id = request.region_id(); let CompactionRequest { diff --git a/src/mito2/src/compaction/compactor.rs b/src/mito2/src/compaction/compactor.rs index 8a1a44d4c2..71698471c3 100644 --- a/src/mito2/src/compaction/compactor.rs +++ b/src/mito2/src/compaction/compactor.rs @@ -35,7 +35,7 @@ use crate::access_layer::{ }; use crate::cache::{CacheManager, CacheManagerRef}; use crate::compaction::picker::{PickerOutput, new_picker}; -use crate::compaction::{CompactionSstReaderBuilder, find_ttl}; +use crate::compaction::{CompactionOutput, CompactionSstReaderBuilder, find_ttl}; use crate::config::MitoConfig; use crate::error::{ EmptyRegionDirSnafu, InvalidPartitionExprSnafu, JoinSnafu, ObjectStoreNotFoundSnafu, Result, @@ -313,6 +313,126 @@ pub trait Compactor: Send + Sync + 'static { /// DefaultCompactor is the default implementation of Compactor. pub struct DefaultCompactor; +impl DefaultCompactor { + /// Merge a single compaction output into SST files. + async fn merge_single_output( + compaction_region: CompactionRegion, + output: CompactionOutput, + write_opts: WriteOptions, + ) -> Result> { + let region_id = compaction_region.region_id; + let storage = compaction_region.region_options.storage.clone(); + let index_options = compaction_region + .current_version + .options + .index_options + .clone(); + let append_mode = compaction_region.current_version.options.append_mode; + let merge_mode = compaction_region.current_version.options.merge_mode(); + let flat_format = compaction_region + .region_options + .sst_format + .map(|format| format == FormatType::Flat) + .unwrap_or( + compaction_region + .engine_config + .default_experimental_flat_format, + ); + + let index_config = compaction_region.engine_config.index.clone(); + let inverted_index_config = compaction_region.engine_config.inverted_index.clone(); + let fulltext_index_config = compaction_region.engine_config.fulltext_index.clone(); + let bloom_filter_index_config = compaction_region.engine_config.bloom_filter_index.clone(); + + let input_file_names = output + .inputs + .iter() + .map(|f| f.file_id().to_string()) + .join(","); + let max_sequence = output + .inputs + .iter() + .map(|f| f.meta_ref().sequence) + .max() + .flatten(); + let builder = CompactionSstReaderBuilder { + metadata: compaction_region.region_metadata.clone(), + sst_layer: compaction_region.access_layer.clone(), + cache: compaction_region.cache_manager.clone(), + inputs: &output.inputs, + append_mode, + filter_deleted: output.filter_deleted, + time_range: output.output_time_range, + merge_mode, + }; + let source = if flat_format { + let reader = builder.build_flat_sst_reader().await?; + Either::Right(FlatSource::Stream(reader)) + } else { + let reader = builder.build_sst_reader().await?; + Either::Left(Source::Reader(reader)) + }; + let mut metrics = Metrics::new(WriteType::Compaction); + let region_metadata = compaction_region.region_metadata.clone(); + let sst_infos = compaction_region + .access_layer + .write_sst( + SstWriteRequest { + op_type: OperationType::Compact, + metadata: region_metadata.clone(), + source, + cache_manager: compaction_region.cache_manager.clone(), + storage, + max_sequence: max_sequence.map(NonZero::get), + index_options, + index_config, + inverted_index_config, + fulltext_index_config, + bloom_filter_index_config, + }, + &write_opts, + &mut metrics, + ) + .await?; + // Convert partition expression once outside the map + let partition_expr = match ®ion_metadata.partition_expr { + None => None, + Some(json_str) if json_str.is_empty() => None, + Some(json_str) => PartitionExpr::from_json_str(json_str).with_context(|_| { + InvalidPartitionExprSnafu { + expr: json_str.clone(), + } + })?, + }; + + let output_files = sst_infos + .into_iter() + .map(|sst_info| FileMeta { + region_id, + file_id: sst_info.file_id, + time_range: sst_info.time_range, + level: output.output_level, + file_size: sst_info.file_size, + available_indexes: sst_info.index_metadata.build_available_indexes(), + index_file_size: sst_info.index_metadata.file_size, + index_file_id: None, + num_rows: sst_info.num_rows as u64, + num_row_groups: sst_info.num_row_groups, + sequence: max_sequence, + partition_expr: partition_expr.clone(), + num_series: sst_info.num_series, + }) + .collect::>(); + let output_file_names = output_files.iter().map(|f| f.file_id.to_string()).join(","); + info!( + "Region {} compaction inputs: [{}], outputs: [{}], flat_format: {}, metrics: {:?}", + region_id, input_file_names, output_file_names, flat_format, metrics + ); + metrics.observe(); + Ok(output_files) + } +} + #[async_trait::async_trait] impl Compactor for DefaultCompactor { async fn merge_ssts( @@ -324,131 +444,22 @@ impl Compactor for DefaultCompactor { let mut compacted_inputs = Vec::with_capacity(picker_output.outputs.iter().map(|o| o.inputs.len()).sum()); let internal_parallelism = compaction_region.max_parallelism.max(1); + let compaction_time_window = picker_output.time_window_size; for output in picker_output.outputs.drain(..) { - compacted_inputs.extend(output.inputs.iter().map(|f| f.meta_ref().clone())); + let inputs_to_remove: Vec<_> = + output.inputs.iter().map(|f| f.meta_ref().clone()).collect(); + compacted_inputs.extend(inputs_to_remove.iter().cloned()); let write_opts = WriteOptions { write_buffer_size: compaction_region.engine_config.sst_write_buffer_size, max_file_size: picker_output.max_file_size, ..Default::default() }; - - let region_metadata = compaction_region.region_metadata.clone(); - let sst_layer = compaction_region.access_layer.clone(); - let region_id = compaction_region.region_id; - let cache_manager = compaction_region.cache_manager.clone(); - let storage = compaction_region.region_options.storage.clone(); - let index_options = compaction_region - .current_version - .options - .index_options - .clone(); - let append_mode = compaction_region.current_version.options.append_mode; - let merge_mode = compaction_region.current_version.options.merge_mode(); - let flat_format = compaction_region - .region_options - .sst_format - .map(|format| format == FormatType::Flat) - .unwrap_or( - compaction_region - .engine_config - .default_experimental_flat_format, - ); - let index_config = compaction_region.engine_config.index.clone(); - let inverted_index_config = compaction_region.engine_config.inverted_index.clone(); - let fulltext_index_config = compaction_region.engine_config.fulltext_index.clone(); - let bloom_filter_index_config = - compaction_region.engine_config.bloom_filter_index.clone(); - let max_sequence = output - .inputs - .iter() - .map(|f| f.meta_ref().sequence) - .max() - .flatten(); - let region_metadata_for_filemeta = region_metadata.clone(); - futs.push(async move { - let input_file_names = output - .inputs - .iter() - .map(|f| f.file_id().to_string()) - .join(","); - let builder = CompactionSstReaderBuilder { - metadata: region_metadata.clone(), - sst_layer: sst_layer.clone(), - cache: cache_manager.clone(), - inputs: &output.inputs, - append_mode, - filter_deleted: output.filter_deleted, - time_range: output.output_time_range, - merge_mode, - }; - let source = if flat_format { - let reader = builder.build_flat_sst_reader().await?; - either::Right(FlatSource::Stream(reader)) - } else { - let reader = builder.build_sst_reader().await?; - either::Left(Source::Reader(reader)) - }; - let mut metrics = Metrics::new(WriteType::Compaction); - let sst_infos = sst_layer - .write_sst( - SstWriteRequest { - op_type: OperationType::Compact, - metadata: region_metadata, - source, - cache_manager, - storage, - max_sequence: max_sequence.map(NonZero::get), - index_options, - index_config, - inverted_index_config, - fulltext_index_config, - bloom_filter_index_config, - }, - &write_opts, - &mut metrics, - ) - .await?; - // Convert partition expression once outside the map - let partition_expr = match ®ion_metadata_for_filemeta.partition_expr { - None => None, - Some(json_str) if json_str.is_empty() => None, - Some(json_str) => { - PartitionExpr::from_json_str(json_str).with_context(|_| { - InvalidPartitionExprSnafu { - expr: json_str.clone(), - } - })? - } - }; - - let output_files = sst_infos - .into_iter() - .map(|sst_info| FileMeta { - region_id, - file_id: sst_info.file_id, - time_range: sst_info.time_range, - level: output.output_level, - file_size: sst_info.file_size, - available_indexes: sst_info.index_metadata.build_available_indexes(), - index_file_size: sst_info.index_metadata.file_size, - index_file_id: None, - num_rows: sst_info.num_rows as u64, - num_row_groups: sst_info.num_row_groups, - sequence: max_sequence, - partition_expr: partition_expr.clone(), - num_series: sst_info.num_series, - }) - .collect::>(); - let output_file_names = - output_files.iter().map(|f| f.file_id.to_string()).join(","); - info!( - "Region {} compaction inputs: [{}], outputs: [{}], flat_format: {}, metrics: {:?}", - region_id, input_file_names, output_file_names, flat_format, metrics - ); - metrics.observe(); - Ok(output_files) - }); + futs.push(Self::merge_single_output( + compaction_region.clone(), + output, + write_opts, + )); } let mut output_files = Vec::with_capacity(futs.len()); while !futs.is_empty() { @@ -466,6 +477,8 @@ impl Compactor for DefaultCompactor { output_files.extend(metas.into_iter().flatten()); } + // In case of remote compaction, we still allow the region edit after merge to + // clean expired ssts. let mut inputs: Vec<_> = compacted_inputs.into_iter().collect(); inputs.extend( picker_output @@ -477,7 +490,7 @@ impl Compactor for DefaultCompactor { Ok(MergeOutput { files_to_add: output_files, files_to_remove: inputs, - compaction_time_window: Some(picker_output.time_window_size), + compaction_time_window: Some(compaction_time_window), }) } @@ -522,6 +535,7 @@ impl Compactor for DefaultCompactor { &compact_request_options, &compaction_region.region_options.compaction, compaction_region.region_options.append_mode, + None, ) .pick(compaction_region); diff --git a/src/mito2/src/compaction/picker.rs b/src/mito2/src/compaction/picker.rs index 16540e1e02..7c5cccfb8c 100644 --- a/src/mito2/src/compaction/picker.rs +++ b/src/mito2/src/compaction/picker.rs @@ -125,6 +125,7 @@ pub fn new_picker( compact_request_options: &compact_request::Options, compaction_options: &CompactionOptions, append_mode: bool, + max_background_tasks: Option, ) -> Arc { if let compact_request::Options::StrictWindow(window) = compact_request_options { let window = if window.window_seconds == 0 { @@ -140,6 +141,7 @@ pub fn new_picker( time_window_seconds: twcs_opts.time_window_seconds(), max_output_file_size: twcs_opts.max_output_file_size.map(|r| r.as_bytes()), append_mode, + max_background_tasks, }) as Arc<_>, } } diff --git a/src/mito2/src/compaction/task.rs b/src/mito2/src/compaction/task.rs index e193665e7a..b54d4c7c62 100644 --- a/src/mito2/src/compaction/task.rs +++ b/src/mito2/src/compaction/task.rs @@ -16,19 +16,22 @@ use std::fmt::{Debug, Formatter}; use std::sync::Arc; use std::time::Instant; -use common_telemetry::{error, info}; +use common_telemetry::{error, info, warn}; +use itertools::Itertools; use snafu::ResultExt; use tokio::sync::mpsc; use crate::compaction::compactor::{CompactionRegion, Compactor}; use crate::compaction::picker::{CompactionTask, PickerOutput}; use crate::error::CompactRegionSnafu; -use crate::manifest::action::RegionEdit; +use crate::manifest::action::{RegionEdit, RegionMetaAction, RegionMetaActionList}; use crate::metrics::{COMPACTION_FAILURE_COUNT, COMPACTION_STAGE_ELAPSED}; +use crate::region::RegionLeaderState; use crate::request::{ - BackgroundNotify, CompactionFailed, CompactionFinished, OutputTx, WorkerRequest, - WorkerRequestWithTime, + BackgroundNotify, CompactionFailed, CompactionFinished, OutputTx, RegionEditResult, + WorkerRequest, WorkerRequestWithTime, }; +use crate::sst::file::FileMeta; use crate::worker::WorkerListener; use crate::{error, metrics}; @@ -78,9 +81,93 @@ impl CompactionTaskImpl { .for_each(|o| o.inputs.iter().for_each(|f| f.set_compacting(compacting))); } - async fn handle_compaction(&mut self) -> error::Result { + /// Remove expired ssts files, update manifest immediately + /// and apply the edit to region version. + /// + /// This function logs errors but does not stop the compaction process if removal fails. + async fn remove_expired( + &self, + compaction_region: &CompactionRegion, + expired_files: Vec, + ) { + let region_id = compaction_region.region_id; + let expired_files_str = expired_files.iter().map(|f| f.file_id).join(","); + let (expire_delete_sender, expire_delete_listener) = tokio::sync::oneshot::channel(); + // Update manifest to remove expired SSTs + let edit = RegionEdit { + files_to_add: Vec::new(), + files_to_remove: expired_files, + timestamp_ms: Some(chrono::Utc::now().timestamp_millis()), + compaction_time_window: None, + flushed_entry_id: None, + flushed_sequence: None, + committed_sequence: None, + }; + + // 1. Update manifest + let action_list = RegionMetaActionList::with_action(RegionMetaAction::Edit(edit.clone())); + if let Err(e) = compaction_region + .manifest_ctx + .update_manifest(RegionLeaderState::Writable, action_list) + .await + { + error!( + e; + "Failed to update manifest for expired files removal, region: {region_id}, files: [{expired_files_str}]. Compaction will continue." + ); + return; + } + + // 2. Notify region worker loop to remove expired files from region version. + self.send_to_worker(WorkerRequest::Background { + region_id, + notify: BackgroundNotify::RegionEdit(RegionEditResult { + region_id, + sender: expire_delete_sender, + edit, + result: Ok(()), + }), + }) + .await; + + if let Err(e) = expire_delete_listener + .await + .context(error::RecvSnafu) + .flatten() + { + warn!( + e; + "Failed to remove expired files from region version, region: {region_id}, files: [{expired_files_str}]. Compaction will continue." + ); + return; + } + + info!( + "Successfully removed expired files, region: {region_id}, files: [{expired_files_str}]" + ); + } + + async fn handle_expiration_and_compaction(&mut self) -> error::Result { self.mark_files_compacting(true); + // 1. In case of local compaction, we can delete expired ssts in advance. + if !self.picker_output.expired_ssts.is_empty() { + let remove_timer = COMPACTION_STAGE_ELAPSED + .with_label_values(&["remove_expired"]) + .start_timer(); + let expired_ssts = self + .picker_output + .expired_ssts + .drain(..) + .map(|f| f.meta_ref().clone()) + .collect(); + // remove_expired logs errors but doesn't stop compaction + self.remove_expired(&self.compaction_region, expired_ssts) + .await; + remove_timer.observe_duration(); + } + + // 2. Merge inputs let merge_timer = COMPACTION_STAGE_ELAPSED .with_label_values(&["merge"]) .start_timer(); @@ -152,7 +239,7 @@ impl CompactionTaskImpl { #[async_trait::async_trait] impl CompactionTask for CompactionTaskImpl { async fn run(&mut self) { - let notify = match self.handle_compaction().await { + let notify = match self.handle_expiration_and_compaction().await { Ok(edit) => BackgroundNotify::CompactionFinished(CompactionFinished { region_id: self.compaction_region.region_id, senders: std::mem::take(&mut self.waiters), @@ -178,3 +265,66 @@ impl CompactionTask for CompactionTaskImpl { .await; } } + +#[cfg(test)] +mod tests { + use store_api::storage::FileId; + + use crate::compaction::picker::PickerOutput; + use crate::compaction::test_util::new_file_handle; + + #[test] + fn test_picker_output_with_expired_ssts() { + // Test that PickerOutput correctly includes expired_ssts + // This verifies that expired SSTs are properly identified and included + // in the picker output, which is then handled by handle_expiration_and_compaction + + let file_ids = (0..3).map(|_| FileId::random()).collect::>(); + let expired_ssts = vec![ + new_file_handle(file_ids[0], 0, 999, 0), + new_file_handle(file_ids[1], 1000, 1999, 0), + ]; + + let picker_output = PickerOutput { + outputs: vec![], + expired_ssts: expired_ssts.clone(), + time_window_size: 3600, + max_file_size: None, + }; + + // Verify expired_ssts are included + assert_eq!(picker_output.expired_ssts.len(), 2); + assert_eq!( + picker_output.expired_ssts[0].file_id(), + expired_ssts[0].file_id() + ); + assert_eq!( + picker_output.expired_ssts[1].file_id(), + expired_ssts[1].file_id() + ); + } + + #[test] + fn test_picker_output_without_expired_ssts() { + // Test that PickerOutput works correctly when there are no expired SSTs + let picker_output = PickerOutput { + outputs: vec![], + expired_ssts: vec![], + time_window_size: 3600, + max_file_size: None, + }; + + // Verify empty expired_ssts + assert!(picker_output.expired_ssts.is_empty()); + } + + // Note: Testing remove_expired() directly requires extensive mocking of: + // - manifest_ctx (ManifestContext) + // - request_sender (mpsc::Sender) + // - WorkerRequest handling + // + // The behavior is tested indirectly through integration tests: + // - remove_expired() logs errors but doesn't stop compaction + // - handle_expiration_and_compaction() continues even if remove_expired() encounters errors + // - The function is designed to be non-blocking for compaction +} diff --git a/src/mito2/src/compaction/twcs.rs b/src/mito2/src/compaction/twcs.rs index 5196eff6b1..371fb8f989 100644 --- a/src/mito2/src/compaction/twcs.rs +++ b/src/mito2/src/compaction/twcs.rs @@ -18,7 +18,7 @@ use std::fmt::Debug; use std::num::NonZeroU64; use common_base::readable_size::ReadableSize; -use common_telemetry::info; +use common_telemetry::{debug, info}; use common_time::Timestamp; use common_time::timestamp::TimeUnit; use common_time::timestamp_millis::BucketAligned; @@ -48,6 +48,8 @@ pub struct TwcsPicker { pub max_output_file_size: Option, /// Whether the target region is in append mode. pub append_mode: bool, + /// Max background compaction tasks. + pub max_background_tasks: Option, } impl TwcsPicker { @@ -88,7 +90,7 @@ impl TwcsPicker { // because after compaction there will be no overlapping files. let filter_deleted = !files.overlapping && found_runs <= 2 && !self.append_mode; if found_runs == 0 { - return output; + continue; } let inputs = if found_runs > 1 { @@ -119,6 +121,16 @@ impl TwcsPicker { filter_deleted, output_time_range: None, // we do not enforce output time range in twcs compactions. }); + + if let Some(max_background_tasks) = self.max_background_tasks + && output.len() >= max_background_tasks + { + debug!( + "Region ({:?}) compaction task size larger than max background tasks({}), remaining tasks discarded", + region_id, max_background_tasks + ); + break; + } } } output @@ -680,6 +692,7 @@ mod tests { time_window_seconds: None, max_output_file_size: None, append_mode: false, + max_background_tasks: None, } .build_output(RegionId::from_u64(0), &mut windows, active_window); @@ -831,5 +844,185 @@ mod tests { } } + #[test] + fn test_build_output_multiple_windows_with_zero_runs() { + let file_ids = (0..6).map(|_| FileId::random()).collect::>(); + + let files = [ + // Window 0: Contains 3 files but not forming any runs (not enough files in sequence to reach trigger_file_num) + new_file_handle_with_sequence(file_ids[0], 0, 999, 0, 1), + new_file_handle_with_sequence(file_ids[1], 0, 999, 0, 2), + new_file_handle_with_sequence(file_ids[2], 0, 999, 0, 3), + // Window 3: Contains files that will form 2 runs + new_file_handle_with_sequence(file_ids[3], 3000, 3999, 0, 4), + new_file_handle_with_sequence(file_ids[4], 3000, 3999, 0, 5), + new_file_handle_with_sequence(file_ids[5], 3000, 3999, 0, 6), + ]; + + let mut windows = assign_to_windows(files.iter(), 3); + + // Create picker with trigger_file_num of 4 so single files won't form runs in first window + let picker = TwcsPicker { + trigger_file_num: 4, // High enough to prevent runs in first window + time_window_seconds: Some(3), + max_output_file_size: None, + append_mode: false, + max_background_tasks: None, + }; + + let active_window = find_latest_window_in_seconds(files.iter(), 3); + let output = picker.build_output(RegionId::from_u64(123), &mut windows, active_window); + + assert!( + !output.is_empty(), + "Should have output from windows with runs, even when one window has 0 runs" + ); + + let all_output_files: Vec<_> = output + .iter() + .flat_map(|o| o.inputs.iter()) + .map(|f| f.file_id().file_id()) + .collect(); + + assert!( + all_output_files.contains(&file_ids[3]) + || all_output_files.contains(&file_ids[4]) + || all_output_files.contains(&file_ids[5]), + "Output should contain files from the window with runs" + ); + } + + #[test] + fn test_build_output_single_window_zero_runs() { + let file_ids = (0..2).map(|_| FileId::random()).collect::>(); + + let large_file_1 = new_file_handle_with_size_and_sequence(file_ids[0], 0, 999, 0, 1, 2000); // 2000 bytes + let large_file_2 = new_file_handle_with_size_and_sequence(file_ids[1], 0, 999, 0, 2, 2500); // 2500 bytes + + let files = [large_file_1, large_file_2]; + + let mut windows = assign_to_windows(files.iter(), 3); + + let picker = TwcsPicker { + trigger_file_num: 2, + time_window_seconds: Some(3), + max_output_file_size: Some(1000), + append_mode: true, + max_background_tasks: None, + }; + + let active_window = find_latest_window_in_seconds(files.iter(), 3); + let output = picker.build_output(RegionId::from_u64(456), &mut windows, active_window); + + // Should return empty output (no compaction needed) + assert!( + output.is_empty(), + "Should return empty output when no runs are found after filtering" + ); + } + + #[test] + fn test_max_background_tasks_truncation() { + let file_ids = (0..10).map(|_| FileId::random()).collect::>(); + let max_background_tasks = 3; + + // Create files across multiple windows that will generate multiple compaction outputs + let files = [ + // Window 0: 4 files that will form a run + new_file_handle_with_sequence(file_ids[0], 0, 999, 0, 1), + new_file_handle_with_sequence(file_ids[1], 0, 999, 0, 2), + new_file_handle_with_sequence(file_ids[2], 0, 999, 0, 3), + new_file_handle_with_sequence(file_ids[3], 0, 999, 0, 4), + // Window 3: 4 files that will form another run + new_file_handle_with_sequence(file_ids[4], 3000, 3999, 0, 5), + new_file_handle_with_sequence(file_ids[5], 3000, 3999, 0, 6), + new_file_handle_with_sequence(file_ids[6], 3000, 3999, 0, 7), + new_file_handle_with_sequence(file_ids[7], 3000, 3999, 0, 8), + // Window 6: 4 files that will form another run + new_file_handle_with_sequence(file_ids[8], 6000, 6999, 0, 9), + new_file_handle_with_sequence(file_ids[9], 6000, 6999, 0, 10), + ]; + + let mut windows = assign_to_windows(files.iter(), 3); + + let picker = TwcsPicker { + trigger_file_num: 4, + time_window_seconds: Some(3), + max_output_file_size: None, + append_mode: false, + max_background_tasks: Some(max_background_tasks), + }; + + let active_window = find_latest_window_in_seconds(files.iter(), 3); + let output = picker.build_output(RegionId::from_u64(123), &mut windows, active_window); + + // Should have at most max_background_tasks outputs + assert!( + output.len() <= max_background_tasks, + "Output should be truncated to max_background_tasks: expected <= {}, got {}", + max_background_tasks, + output.len() + ); + + // Without max_background_tasks, should have more outputs + let picker_no_limit = TwcsPicker { + trigger_file_num: 4, + time_window_seconds: Some(3), + max_output_file_size: None, + append_mode: false, + max_background_tasks: None, + }; + + let mut windows_no_limit = assign_to_windows(files.iter(), 3); + let output_no_limit = picker_no_limit.build_output( + RegionId::from_u64(123), + &mut windows_no_limit, + active_window, + ); + + // Without limit, should have more outputs (if there are enough windows) + if output_no_limit.len() > max_background_tasks { + assert!( + output_no_limit.len() > output.len(), + "Without limit should have more outputs than with limit" + ); + } + } + + #[test] + fn test_max_background_tasks_no_truncation_when_under_limit() { + let file_ids = (0..4).map(|_| FileId::random()).collect::>(); + let max_background_tasks = 10; // Larger than expected outputs + + // Create files in one window that will generate one compaction output + let files = [ + new_file_handle_with_sequence(file_ids[0], 0, 999, 0, 1), + new_file_handle_with_sequence(file_ids[1], 0, 999, 0, 2), + new_file_handle_with_sequence(file_ids[2], 0, 999, 0, 3), + new_file_handle_with_sequence(file_ids[3], 0, 999, 0, 4), + ]; + + let mut windows = assign_to_windows(files.iter(), 3); + + let picker = TwcsPicker { + trigger_file_num: 4, + time_window_seconds: Some(3), + max_output_file_size: None, + append_mode: false, + max_background_tasks: Some(max_background_tasks), + }; + + let active_window = find_latest_window_in_seconds(files.iter(), 3); + let output = picker.build_output(RegionId::from_u64(123), &mut windows, active_window); + + // Should have all outputs since we're under the limit + assert!( + output.len() <= max_background_tasks, + "Output should be within limit" + ); + // Should have at least one output + assert!(!output.is_empty(), "Should have at least one output"); + } + // TODO(hl): TTL tester that checks if get_expired_ssts function works as expected. } diff --git a/src/mito2/src/test_util/memtable_util.rs b/src/mito2/src/test_util/memtable_util.rs index a3811f013f..7ddac4ee0d 100644 --- a/src/mito2/src/test_util/memtable_util.rs +++ b/src/mito2/src/test_util/memtable_util.rs @@ -40,7 +40,6 @@ use crate::memtable::{ BoxedBatchIterator, KeyValues, Memtable, MemtableBuilder, MemtableId, MemtableRanges, MemtableRef, MemtableStats, RangesOptions, }; -use crate::read::scan_region::PredicateGroup; /// Empty memtable for test. #[derive(Debug, Default)]