Skip to main content

mito2/compaction/
compactor.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::num::NonZero;
16use std::sync::Arc;
17use std::time::Duration;
18
19use common_meta::key::SchemaMetadataManagerRef;
20use common_telemetry::{info, warn};
21use common_time::TimeToLive;
22use either::Either;
23use itertools::Itertools;
24use object_store::manager::ObjectStoreManagerRef;
25use partition::expr::PartitionExpr;
26use serde::{Deserialize, Serialize};
27use snafu::{OptionExt, ResultExt};
28use store_api::metadata::RegionMetadataRef;
29use store_api::region_request::PathType;
30use store_api::storage::RegionId;
31
32use crate::access_layer::{
33    AccessLayer, AccessLayerRef, Metrics, OperationType, SstWriteRequest, WriteType,
34};
35use crate::cache::{CacheManager, CacheManagerRef};
36use crate::compaction::picker::PickerOutput;
37use crate::compaction::{CompactionOutput, CompactionSstReaderBuilder, find_dynamic_options};
38use crate::config::MitoConfig;
39use crate::error;
40use crate::error::{
41    EmptyRegionDirSnafu, InvalidPartitionExprSnafu, ObjectStoreNotFoundSnafu, Result,
42};
43use crate::manifest::action::{RegionEdit, RegionMetaAction, RegionMetaActionList};
44use crate::manifest::manager::{RegionManifestManager, RegionManifestOptions};
45use crate::read::FlatSource;
46use crate::region::options::RegionOptions;
47use crate::region::version::VersionRef;
48use crate::region::{ManifestContext, RegionLeaderState, RegionRoleState};
49use crate::schedule::scheduler::LocalScheduler;
50use crate::sst::FormatType;
51use crate::sst::file::FileMeta;
52use crate::sst::file_purger::LocalFilePurger;
53use crate::sst::index::intermediate::IntermediateManager;
54use crate::sst::index::puffin_manager::PuffinManagerFactory;
55use crate::sst::location::region_dir_from_table_dir;
56use crate::sst::parquet::WriteOptions;
57use crate::sst::version::{SstVersion, SstVersionRef};
58
59/// Region version for compaction that does not hold memtables.
60#[derive(Clone)]
61pub struct CompactionVersion {
62    /// Metadata of the region.
63    ///
64    /// Altering metadata isn't frequent, storing metadata in Arc to allow sharing
65    /// metadata and reuse metadata when creating a new `Version`.
66    pub(crate) metadata: RegionMetadataRef,
67    /// Options of the region.
68    pub(crate) options: RegionOptions,
69    /// SSTs of the region.
70    pub(crate) ssts: SstVersionRef,
71    /// Inferred compaction time window.
72    pub(crate) compaction_time_window: Option<Duration>,
73}
74
75impl From<VersionRef> for CompactionVersion {
76    fn from(value: VersionRef) -> Self {
77        Self {
78            metadata: value.metadata.clone(),
79            options: value.options.clone(),
80            ssts: value.ssts.clone(),
81            compaction_time_window: value.compaction_time_window,
82        }
83    }
84}
85
86/// CompactionRegion represents a region that needs to be compacted.
87/// It's the subset of MitoRegion.
88#[derive(Clone)]
89pub struct CompactionRegion {
90    pub region_id: RegionId,
91    pub region_options: RegionOptions,
92
93    pub(crate) engine_config: Arc<MitoConfig>,
94    pub(crate) region_metadata: RegionMetadataRef,
95    pub(crate) cache_manager: CacheManagerRef,
96    /// Access layer to get the table path and path type.
97    pub access_layer: AccessLayerRef,
98    pub(crate) manifest_ctx: Arc<ManifestContext>,
99    pub(crate) current_version: CompactionVersion,
100    pub(crate) file_purger: Option<Arc<LocalFilePurger>>,
101    pub(crate) ttl: Option<TimeToLive>,
102
103    /// Controls the parallelism of this compaction task. Default is 1.
104    ///
105    /// The parallel is inside this compaction task, not across different compaction tasks.
106    /// It can be different windows of the same compaction task or something like this.
107    pub max_parallelism: usize,
108}
109
110/// OpenCompactionRegionRequest represents the request to open a compaction region.
111#[derive(Debug, Clone)]
112pub struct OpenCompactionRegionRequest {
113    pub region_id: RegionId,
114    pub table_dir: String,
115    pub path_type: PathType,
116    pub region_options: RegionOptions,
117    pub max_parallelism: usize,
118}
119
120/// Open a compaction region from a compaction request.
121/// It's simple version of RegionOpener::open().
122pub async fn open_compaction_region(
123    req: &OpenCompactionRegionRequest,
124    mito_config: &MitoConfig,
125    object_store_manager: ObjectStoreManagerRef,
126    ttl_provider: Either<TimeToLive, SchemaMetadataManagerRef>,
127) -> Result<CompactionRegion> {
128    let object_store = {
129        let name = &req.region_options.storage;
130        if let Some(name) = name {
131            object_store_manager
132                .find(name)
133                .with_context(|| ObjectStoreNotFoundSnafu {
134                    object_store: name.clone(),
135                })?
136        } else {
137            object_store_manager.default_object_store()
138        }
139    };
140
141    let access_layer = {
142        let puffin_manager_factory = PuffinManagerFactory::new(
143            &mito_config.index.aux_path,
144            mito_config.index.staging_size.as_bytes(),
145            Some(mito_config.index.write_buffer_size.as_bytes() as _),
146            mito_config.index.staging_ttl,
147        )
148        .await?;
149        let intermediate_manager =
150            IntermediateManager::init_fs(mito_config.index.aux_path.clone()).await?;
151
152        Arc::new(AccessLayer::new(
153            &req.table_dir,
154            req.path_type,
155            object_store.clone(),
156            puffin_manager_factory,
157            intermediate_manager,
158        ))
159    };
160
161    let manifest_manager = {
162        let region_dir = region_dir_from_table_dir(&req.table_dir, req.region_id, req.path_type);
163        let region_manifest_options =
164            RegionManifestOptions::new(mito_config, &region_dir, object_store);
165
166        RegionManifestManager::open(region_manifest_options, &Default::default())
167            .await?
168            .with_context(|| EmptyRegionDirSnafu {
169                region_id: req.region_id,
170                region_dir: region_dir_from_table_dir(&req.table_dir, req.region_id, req.path_type),
171            })?
172    };
173
174    let manifest = manifest_manager.manifest();
175    let region_metadata = manifest.metadata.clone();
176    let manifest_ctx = Arc::new(ManifestContext::new(
177        manifest_manager,
178        RegionRoleState::Leader(RegionLeaderState::Writable),
179    ));
180
181    let file_purger = {
182        let purge_scheduler = Arc::new(LocalScheduler::new(mito_config.max_background_purges));
183        Arc::new(LocalFilePurger::new(
184            purge_scheduler.clone(),
185            access_layer.clone(),
186            None,
187        ))
188    };
189
190    let current_version = {
191        let mut ssts = SstVersion::new();
192        ssts.add_files(file_purger.clone(), manifest.files.values().cloned());
193        CompactionVersion {
194            metadata: region_metadata.clone(),
195            options: req.region_options.clone(),
196            ssts: Arc::new(ssts),
197            compaction_time_window: manifest.compaction_time_window,
198        }
199    };
200
201    let ttl = match ttl_provider {
202        // Use the specified ttl.
203        Either::Left(ttl) => ttl,
204        // Get the ttl from the schema metadata manager.
205        Either::Right(schema_metadata_manager) => {
206            let (_, ttl) = find_dynamic_options(
207                req.region_id.table_id(),
208                &req.region_options,
209                &schema_metadata_manager,
210            )
211            .await
212            .unwrap_or_else(|e| {
213                warn!(e; "Failed to get ttl for region: {}", region_metadata.region_id);
214                (
215                    crate::region::options::CompactionOptions::default(),
216                    TimeToLive::default(),
217                )
218            });
219            ttl
220        }
221    };
222
223    Ok(CompactionRegion {
224        region_id: req.region_id,
225        region_options: req.region_options.clone(),
226        engine_config: Arc::new(mito_config.clone()),
227        region_metadata: region_metadata.clone(),
228        cache_manager: Arc::new(CacheManager::default()),
229        access_layer,
230        manifest_ctx,
231        current_version,
232        file_purger: Some(file_purger),
233        ttl: Some(ttl),
234        max_parallelism: req.max_parallelism,
235    })
236}
237
238impl CompactionRegion {
239    /// Get the file purger of the compaction region.
240    pub fn file_purger(&self) -> Option<Arc<LocalFilePurger>> {
241        self.file_purger.clone()
242    }
243
244    /// Stop the file purger scheduler of the compaction region.
245    pub async fn stop_purger_scheduler(&self) -> Result<()> {
246        if let Some(file_purger) = &self.file_purger {
247            file_purger.stop_scheduler().await
248        } else {
249            Ok(())
250        }
251    }
252}
253
254/// `[MergeOutput]` represents the output of merging SST files.
255#[derive(Default, Clone, Debug, Serialize, Deserialize)]
256pub struct MergeOutput {
257    pub files_to_add: Vec<FileMeta>,
258    pub files_to_remove: Vec<FileMeta>,
259    pub compaction_time_window: Option<i64>,
260}
261
262impl MergeOutput {
263    pub fn is_empty(&self) -> bool {
264        self.files_to_add.is_empty() && self.files_to_remove.is_empty()
265    }
266
267    pub fn input_file_size(&self) -> u64 {
268        self.files_to_remove.iter().map(|f| f.file_size).sum()
269    }
270
271    pub fn output_file_size(&self) -> u64 {
272        self.files_to_add.iter().map(|f| f.file_size).sum()
273    }
274}
275
276/// Compactor is the trait that defines the compaction logic.
277#[async_trait::async_trait]
278pub trait Compactor: Send + Sync + 'static {
279    /// Merge SST files for a region.
280    async fn merge_ssts(
281        &self,
282        compaction_region: &CompactionRegion,
283        picker_output: PickerOutput,
284    ) -> Result<MergeOutput>;
285
286    /// Update the manifest after merging SST files.
287    async fn update_manifest(
288        &self,
289        compaction_region: &CompactionRegion,
290        merge_output: MergeOutput,
291    ) -> Result<RegionEdit>;
292}
293
294/// Trait for merging a single compaction output into SST files.
295///
296/// This is extracted from `DefaultCompactor` to allow injecting mock
297/// implementations in tests.
298#[async_trait::async_trait]
299pub trait SstMerger: Send + Sync + 'static {
300    async fn merge_single_output(
301        &self,
302        compaction_region: CompactionRegion,
303        output: CompactionOutput,
304        write_opts: WriteOptions,
305    ) -> Result<Vec<FileMeta>>;
306}
307
308/// The production [`SstMerger`] that reads, merges, and writes SST files.
309#[derive(Clone)]
310pub struct DefaultSstMerger;
311
312#[async_trait::async_trait]
313impl SstMerger for DefaultSstMerger {
314    async fn merge_single_output(
315        &self,
316        compaction_region: CompactionRegion,
317        output: CompactionOutput,
318        write_opts: WriteOptions,
319    ) -> Result<Vec<FileMeta>> {
320        let region_id = compaction_region.region_id;
321        let storage = compaction_region.region_options.storage.clone();
322        let index_options = compaction_region
323            .current_version
324            .options
325            .index_options
326            .clone();
327        let append_mode = compaction_region.current_version.options.append_mode;
328        let merge_mode = compaction_region.current_version.options.merge_mode();
329        let flat_format = compaction_region
330            .region_options
331            .sst_format
332            .map(|format| format == FormatType::Flat)
333            .unwrap_or(compaction_region.engine_config.default_flat_format);
334
335        let index_config = compaction_region.engine_config.index.clone();
336        let inverted_index_config = compaction_region.engine_config.inverted_index.clone();
337        let fulltext_index_config = compaction_region.engine_config.fulltext_index.clone();
338        let bloom_filter_index_config = compaction_region.engine_config.bloom_filter_index.clone();
339        #[cfg(feature = "vector_index")]
340        let vector_index_config = compaction_region.engine_config.vector_index.clone();
341
342        let input_file_names = output
343            .inputs
344            .iter()
345            .map(|f| f.file_id().to_string())
346            .join(",");
347        let max_sequence = output
348            .inputs
349            .iter()
350            .map(|f| f.meta_ref().sequence)
351            .max()
352            .flatten();
353        let builder = CompactionSstReaderBuilder {
354            metadata: compaction_region.region_metadata.clone(),
355            sst_layer: compaction_region.access_layer.clone(),
356            cache: compaction_region.cache_manager.clone(),
357            inputs: &output.inputs,
358            append_mode,
359            filter_deleted: output.filter_deleted,
360            time_range: output.output_time_range,
361            merge_mode,
362        };
363        let reader = builder.build_flat_sst_reader().await?;
364        let source = FlatSource::Stream(reader);
365        let mut metrics = Metrics::new(WriteType::Compaction);
366        let region_metadata = compaction_region.region_metadata.clone();
367        let sst_infos = compaction_region
368            .access_layer
369            .write_sst(
370                SstWriteRequest {
371                    op_type: OperationType::Compact,
372                    metadata: region_metadata.clone(),
373                    source,
374                    cache_manager: compaction_region.cache_manager.clone(),
375                    storage,
376                    max_sequence: max_sequence.map(NonZero::get),
377                    sst_write_format: if flat_format {
378                        FormatType::Flat
379                    } else {
380                        FormatType::PrimaryKey
381                    },
382                    index_options,
383                    index_config,
384                    inverted_index_config,
385                    fulltext_index_config,
386                    bloom_filter_index_config,
387                    #[cfg(feature = "vector_index")]
388                    vector_index_config,
389                },
390                &write_opts,
391                &mut metrics,
392            )
393            .await?;
394        // Convert partition expression once outside the map
395        let partition_expr = match &region_metadata.partition_expr {
396            None => None,
397            Some(json_str) if json_str.is_empty() => None,
398            Some(json_str) => PartitionExpr::from_json_str(json_str).with_context(|_| {
399                InvalidPartitionExprSnafu {
400                    expr: json_str.clone(),
401                }
402            })?,
403        };
404
405        let output_files = sst_infos
406            .into_iter()
407            .map(|sst_info| FileMeta {
408                region_id,
409                file_id: sst_info.file_id,
410                time_range: sst_info.time_range,
411                level: output.output_level,
412                file_size: sst_info.file_size,
413                max_row_group_uncompressed_size: sst_info.max_row_group_uncompressed_size,
414                available_indexes: sst_info.index_metadata.build_available_indexes(),
415                indexes: sst_info.index_metadata.build_indexes(),
416                index_file_size: sst_info.index_metadata.file_size,
417                index_version: 0,
418                num_rows: sst_info.num_rows as u64,
419                num_row_groups: sst_info.num_row_groups,
420                sequence: max_sequence,
421                partition_expr: partition_expr.clone(),
422                num_series: sst_info.num_series,
423            })
424            .collect::<Vec<_>>();
425        let output_file_names = output_files.iter().map(|f| f.file_id.to_string()).join(",");
426        info!(
427            "Region {} compaction inputs: [{}], outputs: [{}], flat_format: {}, metrics: {:?}",
428            region_id, input_file_names, output_file_names, flat_format, metrics
429        );
430        metrics.observe();
431        Ok(output_files)
432    }
433}
434
435/// DefaultCompactor is the default implementation of Compactor.
436///
437/// It is parameterized by an [`SstMerger`] to allow injecting mock
438/// implementations in tests.
439pub struct DefaultCompactor<M = DefaultSstMerger> {
440    merger: M,
441}
442
443impl Default for DefaultCompactor {
444    fn default() -> Self {
445        Self {
446            merger: DefaultSstMerger,
447        }
448    }
449}
450
451impl<M: SstMerger> DefaultCompactor<M> {
452    pub fn with_merger(merger: M) -> Self {
453        Self { merger }
454    }
455}
456
457#[async_trait::async_trait]
458impl<M: SstMerger> Compactor for DefaultCompactor<M>
459where
460    M: Clone,
461{
462    async fn merge_ssts(
463        &self,
464        compaction_region: &CompactionRegion,
465        mut picker_output: PickerOutput,
466    ) -> Result<MergeOutput> {
467        let internal_parallelism = compaction_region.max_parallelism.max(1);
468        let compaction_time_window = picker_output.time_window_size;
469        let region_id = compaction_region.region_id;
470
471        // Build tasks along with their input file metas so we can track which
472        // inputs correspond to each task.
473        let mut tasks: Vec<(Vec<FileMeta>, _)> = Vec::with_capacity(picker_output.outputs.len());
474
475        for output in picker_output.outputs.drain(..) {
476            let inputs_to_remove: Vec<_> =
477                output.inputs.iter().map(|f| f.meta_ref().clone()).collect();
478            let write_opts = WriteOptions {
479                write_buffer_size: compaction_region.engine_config.sst_write_buffer_size,
480                max_file_size: picker_output.max_file_size,
481                ..Default::default()
482            };
483            let merger = self.merger.clone();
484            let compaction_region = compaction_region.clone();
485            let fut = async move {
486                merger
487                    .merge_single_output(compaction_region, output, write_opts)
488                    .await
489            };
490            tasks.push((inputs_to_remove, fut));
491        }
492
493        let mut output_files = Vec::with_capacity(tasks.len());
494        let mut compacted_inputs = Vec::with_capacity(
495            tasks.iter().map(|(inputs, _)| inputs.len()).sum::<usize>()
496                + picker_output.expired_ssts.len(),
497        );
498
499        while !tasks.is_empty() {
500            let mut chunk: Vec<(Vec<FileMeta>, _)> = Vec::with_capacity(internal_parallelism);
501            for _ in 0..internal_parallelism {
502                if let Some(task) = tasks.pop() {
503                    chunk.push(task);
504                }
505            }
506            let spawned: Vec<_> = chunk
507                .into_iter()
508                .map(|(inputs, fut)| {
509                    let handle = common_runtime::spawn_compact(fut);
510                    (inputs, handle)
511                })
512                .collect();
513
514            for (inputs, handle) in spawned {
515                match handle.await {
516                    Ok(Ok(files)) => {
517                        output_files.extend(files);
518                        compacted_inputs.extend(inputs);
519                    }
520                    Ok(Err(e)) => {
521                        warn!(
522                            e; "Region {} failed to merge compaction output with inputs: [{}], skipping",
523                            region_id,
524                            inputs.iter().map(|f| f.file_id.to_string()).join(",")
525                        );
526                    }
527                    Err(e) => {
528                        warn!(
529                            "Region {} compaction task join error for inputs: [{}], skipping: {}",
530                            region_id,
531                            inputs.iter().map(|f| f.file_id.to_string()).join(","),
532                            e
533                        );
534                        return Err(e).context(error::JoinSnafu);
535                    }
536                }
537            }
538        }
539
540        // Include expired SSTs in removals — these don't depend on merge success.
541        compacted_inputs.extend(
542            picker_output
543                .expired_ssts
544                .iter()
545                .map(|f| f.meta_ref().clone()),
546        );
547
548        Ok(MergeOutput {
549            files_to_add: output_files,
550            files_to_remove: compacted_inputs,
551            compaction_time_window: Some(compaction_time_window),
552        })
553    }
554
555    async fn update_manifest(
556        &self,
557        compaction_region: &CompactionRegion,
558        merge_output: MergeOutput,
559    ) -> Result<RegionEdit> {
560        // Write region edit to manifest.
561        let edit = RegionEdit {
562            files_to_add: merge_output.files_to_add,
563            files_to_remove: merge_output.files_to_remove,
564            // Use current timestamp as the edit timestamp.
565            timestamp_ms: Some(chrono::Utc::now().timestamp_millis()),
566            compaction_time_window: merge_output
567                .compaction_time_window
568                .map(|seconds| Duration::from_secs(seconds as u64)),
569            flushed_entry_id: None,
570            flushed_sequence: None,
571            committed_sequence: None,
572        };
573
574        let action_list = RegionMetaActionList::with_action(RegionMetaAction::Edit(edit.clone()));
575        // TODO: We might leak files if we fail to update manifest. We can add a cleanup task to remove them later.
576        compaction_region
577            .manifest_ctx
578            .update_manifest(RegionLeaderState::Writable, action_list, false)
579            .await?;
580
581        Ok(edit)
582    }
583}
584
585#[cfg(test)]
586mod tests {
587    use std::sync::Mutex;
588    use std::sync::atomic::{AtomicUsize, Ordering};
589
590    use store_api::storage::{FileId, RegionId};
591
592    use super::*;
593    use crate::cache::CacheManager;
594    use crate::compaction::picker::PickerOutput;
595    use crate::sst::file::FileHandle;
596    use crate::sst::file_purger::NoopFilePurger;
597    use crate::sst::version::SstVersion;
598    use crate::test_util::memtable_util::metadata_for_test;
599    use crate::test_util::scheduler_util::SchedulerEnv;
600
601    fn dummy_file_meta() -> FileMeta {
602        FileMeta {
603            region_id: RegionId::new(1, 1),
604            file_id: FileId::random(),
605            file_size: 100,
606            ..Default::default()
607        }
608    }
609
610    fn new_file_handle(meta: FileMeta) -> FileHandle {
611        FileHandle::new(meta, Arc::new(NoopFilePurger))
612    }
613
614    /// Build a minimal [`CompactionRegion`] suitable for tests where the
615    /// [`SstMerger`] is mocked and never touches the access layer.
616    async fn new_test_compaction_region() -> CompactionRegion {
617        let env = SchedulerEnv::new().await;
618        let metadata = metadata_for_test();
619        let manifest_ctx = env.mock_manifest_context(metadata.clone()).await;
620        CompactionRegion {
621            region_id: RegionId::new(1, 1),
622            region_options: RegionOptions::default(),
623            engine_config: Arc::new(MitoConfig::default()),
624            region_metadata: metadata.clone(),
625            cache_manager: Arc::new(CacheManager::default()),
626            access_layer: env.access_layer.clone(),
627            manifest_ctx,
628            current_version: CompactionVersion {
629                metadata,
630                options: RegionOptions::default(),
631                ssts: Arc::new(SstVersion::new()),
632                compaction_time_window: None,
633            },
634            file_purger: None,
635            ttl: None,
636            max_parallelism: 1,
637        }
638    }
639
640    /// An [`SstMerger`] that returns pre-configured results per call index.
641    ///
642    /// Call 0 gets `results[0]`, call 1 gets `results[1]`, etc.
643    #[derive(Clone)]
644    struct MockMerger {
645        results: Arc<Mutex<Vec<Result<Vec<FileMeta>>>>>,
646        call_idx: Arc<AtomicUsize>,
647    }
648
649    impl MockMerger {
650        fn new(results: Vec<Result<Vec<FileMeta>>>) -> Self {
651            Self {
652                results: Arc::new(Mutex::new(results)),
653                call_idx: Arc::new(AtomicUsize::new(0)),
654            }
655        }
656    }
657
658    #[async_trait::async_trait]
659    impl SstMerger for MockMerger {
660        async fn merge_single_output(
661            &self,
662            _compaction_region: CompactionRegion,
663            _output: CompactionOutput,
664            _write_opts: WriteOptions,
665        ) -> Result<Vec<FileMeta>> {
666            let idx = self.call_idx.fetch_add(1, Ordering::SeqCst);
667            match self.results.lock().unwrap().get(idx) {
668                Some(Ok(files)) => Ok(files.clone()),
669                Some(Err(_)) => error::InvalidMetaSnafu {
670                    reason: format!("simulated failure at index {idx}"),
671                }
672                .fail(),
673                None => panic!("MockMerger: no result configured for call index {idx}"),
674            }
675        }
676    }
677
678    #[tokio::test]
679    async fn test_partial_merge_failure_collects_only_successful_outputs() {
680        common_telemetry::init_default_ut_logging();
681
682        let compaction_region = new_test_compaction_region().await;
683
684        // Prepare 3 compaction outputs: output 0 and 2 succeed, output 1 fails.
685        let input_meta_0 = dummy_file_meta();
686        let input_meta_1 = dummy_file_meta();
687        let input_meta_2 = dummy_file_meta();
688
689        let output_meta_0 = vec![dummy_file_meta()];
690        let output_meta_2 = vec![dummy_file_meta(), dummy_file_meta()];
691
692        let merger = MockMerger::new(vec![
693            Ok(output_meta_0.clone()),
694            Err(error::InvalidMetaSnafu {
695                reason: "boom".to_string(),
696            }
697            .build()),
698            Ok(output_meta_2.clone()),
699        ]);
700        let compactor = DefaultCompactor::with_merger(merger);
701
702        let picker_output = PickerOutput {
703            outputs: vec![
704                CompactionOutput {
705                    output_level: 1,
706                    inputs: vec![new_file_handle(input_meta_0.clone())],
707                    filter_deleted: false,
708                    output_time_range: None,
709                },
710                CompactionOutput {
711                    output_level: 1,
712                    inputs: vec![new_file_handle(input_meta_1.clone())],
713                    filter_deleted: false,
714                    output_time_range: None,
715                },
716                CompactionOutput {
717                    output_level: 1,
718                    inputs: vec![new_file_handle(input_meta_2.clone())],
719                    filter_deleted: false,
720                    output_time_range: None,
721                },
722            ],
723            expired_ssts: vec![],
724            time_window_size: 3600,
725            max_file_size: None,
726        };
727
728        let merge_output = compactor
729            .merge_ssts(&compaction_region, picker_output)
730            .await
731            .unwrap();
732
733        // Outputs 0 and 2 succeeded (1 + 2 = 3 files added).
734        assert_eq!(merge_output.files_to_add.len(), 3);
735        // Only inputs from successful merges should be removed.
736        assert_eq!(merge_output.files_to_remove.len(), 2);
737
738        let removed_ids: Vec<_> = merge_output
739            .files_to_remove
740            .iter()
741            .map(|f| f.file_id)
742            .collect();
743        assert!(removed_ids.contains(&input_meta_0.file_id));
744        assert!(removed_ids.contains(&input_meta_2.file_id));
745        // The failed output's input must NOT be removed.
746        assert!(!removed_ids.contains(&input_meta_1.file_id));
747    }
748
749    #[tokio::test]
750    async fn test_all_outputs_succeed() {
751        common_telemetry::init_default_ut_logging();
752
753        let compaction_region = new_test_compaction_region().await;
754        let input_meta = dummy_file_meta();
755        let output_meta = vec![dummy_file_meta()];
756
757        let merger = MockMerger::new(vec![Ok(output_meta.clone())]);
758        let compactor = DefaultCompactor::with_merger(merger);
759
760        let picker_output = PickerOutput {
761            outputs: vec![CompactionOutput {
762                output_level: 1,
763                inputs: vec![new_file_handle(input_meta.clone())],
764                filter_deleted: false,
765                output_time_range: None,
766            }],
767            expired_ssts: vec![],
768            time_window_size: 3600,
769            max_file_size: None,
770        };
771
772        let merge_output = compactor
773            .merge_ssts(&compaction_region, picker_output)
774            .await
775            .unwrap();
776
777        assert_eq!(merge_output.files_to_add.len(), 1);
778        assert_eq!(merge_output.files_to_add[0].file_id, output_meta[0].file_id);
779        assert_eq!(merge_output.files_to_remove.len(), 1);
780        assert_eq!(merge_output.files_to_remove[0].file_id, input_meta.file_id);
781    }
782
783    #[tokio::test]
784    async fn test_expired_ssts_always_removed() {
785        common_telemetry::init_default_ut_logging();
786
787        let compaction_region = new_test_compaction_region().await;
788        let input_meta = dummy_file_meta();
789        let expired_meta = dummy_file_meta();
790
791        // The single merge output fails, but expired SSTs should still be removed.
792        let merger = MockMerger::new(vec![Err(error::InvalidMetaSnafu {
793            reason: "fail".to_string(),
794        }
795        .build())]);
796        let compactor = DefaultCompactor::with_merger(merger);
797
798        let picker_output = PickerOutput {
799            outputs: vec![CompactionOutput {
800                output_level: 1,
801                inputs: vec![new_file_handle(input_meta.clone())],
802                filter_deleted: false,
803                output_time_range: None,
804            }],
805            expired_ssts: vec![new_file_handle(expired_meta.clone())],
806            time_window_size: 3600,
807            max_file_size: None,
808        };
809
810        let merge_output = compactor
811            .merge_ssts(&compaction_region, picker_output)
812            .await
813            .unwrap();
814
815        // No files added (merge failed).
816        assert!(merge_output.files_to_add.is_empty());
817        // Only the expired SST should be in files_to_remove (not the failed merge's input).
818        assert_eq!(merge_output.files_to_remove.len(), 1);
819        assert_eq!(
820            merge_output.files_to_remove[0].file_id,
821            expired_meta.file_id
822        );
823    }
824}