Skip to main content

mito2/compaction/
compactor.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::num::NonZero;
16use std::sync::Arc;
17use std::time::Duration;
18
19use api::v1::region::compact_request;
20use common_meta::key::SchemaMetadataManagerRef;
21use common_telemetry::{info, warn};
22use common_time::TimeToLive;
23use either::Either;
24use itertools::Itertools;
25use object_store::manager::ObjectStoreManagerRef;
26use partition::expr::PartitionExpr;
27use serde::{Deserialize, Serialize};
28use snafu::{OptionExt, ResultExt};
29use store_api::metadata::RegionMetadataRef;
30use store_api::region_request::PathType;
31use store_api::storage::RegionId;
32
33use crate::access_layer::{
34    AccessLayer, AccessLayerRef, Metrics, OperationType, SstWriteRequest, WriteType,
35};
36use crate::cache::{CacheManager, CacheManagerRef};
37use crate::compaction::picker::{PickerOutput, new_picker};
38use crate::compaction::{CompactionOutput, CompactionSstReaderBuilder, find_dynamic_options};
39use crate::config::MitoConfig;
40use crate::error::{
41    EmptyRegionDirSnafu, InvalidPartitionExprSnafu, ObjectStoreNotFoundSnafu, Result,
42};
43use crate::manifest::action::{RegionEdit, RegionMetaAction, RegionMetaActionList};
44use crate::manifest::manager::{RegionManifestManager, RegionManifestOptions};
45use crate::read::FlatSource;
46use crate::region::options::RegionOptions;
47use crate::region::version::VersionRef;
48use crate::region::{ManifestContext, RegionLeaderState, RegionRoleState};
49use crate::schedule::scheduler::LocalScheduler;
50use crate::sst::FormatType;
51use crate::sst::file::FileMeta;
52use crate::sst::file_purger::LocalFilePurger;
53use crate::sst::index::intermediate::IntermediateManager;
54use crate::sst::index::puffin_manager::PuffinManagerFactory;
55use crate::sst::location::region_dir_from_table_dir;
56use crate::sst::parquet::WriteOptions;
57use crate::sst::version::{SstVersion, SstVersionRef};
58use crate::{error, metrics};
59
60/// Region version for compaction that does not hold memtables.
61#[derive(Clone)]
62pub struct CompactionVersion {
63    /// Metadata of the region.
64    ///
65    /// Altering metadata isn't frequent, storing metadata in Arc to allow sharing
66    /// metadata and reuse metadata when creating a new `Version`.
67    pub(crate) metadata: RegionMetadataRef,
68    /// Options of the region.
69    pub(crate) options: RegionOptions,
70    /// SSTs of the region.
71    pub(crate) ssts: SstVersionRef,
72    /// Inferred compaction time window.
73    pub(crate) compaction_time_window: Option<Duration>,
74}
75
76impl From<VersionRef> for CompactionVersion {
77    fn from(value: VersionRef) -> Self {
78        Self {
79            metadata: value.metadata.clone(),
80            options: value.options.clone(),
81            ssts: value.ssts.clone(),
82            compaction_time_window: value.compaction_time_window,
83        }
84    }
85}
86
87/// CompactionRegion represents a region that needs to be compacted.
88/// It's the subset of MitoRegion.
89#[derive(Clone)]
90pub struct CompactionRegion {
91    pub region_id: RegionId,
92    pub region_options: RegionOptions,
93
94    pub(crate) engine_config: Arc<MitoConfig>,
95    pub(crate) region_metadata: RegionMetadataRef,
96    pub(crate) cache_manager: CacheManagerRef,
97    /// Access layer to get the table path and path type.
98    pub access_layer: AccessLayerRef,
99    pub(crate) manifest_ctx: Arc<ManifestContext>,
100    pub(crate) current_version: CompactionVersion,
101    pub(crate) file_purger: Option<Arc<LocalFilePurger>>,
102    pub(crate) ttl: Option<TimeToLive>,
103
104    /// Controls the parallelism of this compaction task. Default is 1.
105    ///
106    /// The parallel is inside this compaction task, not across different compaction tasks.
107    /// It can be different windows of the same compaction task or something like this.
108    pub max_parallelism: usize,
109}
110
111/// OpenCompactionRegionRequest represents the request to open a compaction region.
112#[derive(Debug, Clone)]
113pub struct OpenCompactionRegionRequest {
114    pub region_id: RegionId,
115    pub table_dir: String,
116    pub path_type: PathType,
117    pub region_options: RegionOptions,
118    pub max_parallelism: usize,
119}
120
121/// Open a compaction region from a compaction request.
122/// It's simple version of RegionOpener::open().
123pub async fn open_compaction_region(
124    req: &OpenCompactionRegionRequest,
125    mito_config: &MitoConfig,
126    object_store_manager: ObjectStoreManagerRef,
127    ttl_provider: Either<TimeToLive, SchemaMetadataManagerRef>,
128) -> Result<CompactionRegion> {
129    let object_store = {
130        let name = &req.region_options.storage;
131        if let Some(name) = name {
132            object_store_manager
133                .find(name)
134                .with_context(|| ObjectStoreNotFoundSnafu {
135                    object_store: name.clone(),
136                })?
137        } else {
138            object_store_manager.default_object_store()
139        }
140    };
141
142    let access_layer = {
143        let puffin_manager_factory = PuffinManagerFactory::new(
144            &mito_config.index.aux_path,
145            mito_config.index.staging_size.as_bytes(),
146            Some(mito_config.index.write_buffer_size.as_bytes() as _),
147            mito_config.index.staging_ttl,
148        )
149        .await?;
150        let intermediate_manager =
151            IntermediateManager::init_fs(mito_config.index.aux_path.clone()).await?;
152
153        Arc::new(AccessLayer::new(
154            &req.table_dir,
155            req.path_type,
156            object_store.clone(),
157            puffin_manager_factory,
158            intermediate_manager,
159        ))
160    };
161
162    let manifest_manager = {
163        let region_dir = region_dir_from_table_dir(&req.table_dir, req.region_id, req.path_type);
164        let region_manifest_options =
165            RegionManifestOptions::new(mito_config, &region_dir, object_store);
166
167        RegionManifestManager::open(region_manifest_options, &Default::default())
168            .await?
169            .with_context(|| EmptyRegionDirSnafu {
170                region_id: req.region_id,
171                region_dir: region_dir_from_table_dir(&req.table_dir, req.region_id, req.path_type),
172            })?
173    };
174
175    let manifest = manifest_manager.manifest();
176    let region_metadata = manifest.metadata.clone();
177    let manifest_ctx = Arc::new(ManifestContext::new(
178        manifest_manager,
179        RegionRoleState::Leader(RegionLeaderState::Writable),
180    ));
181
182    let file_purger = {
183        let purge_scheduler = Arc::new(LocalScheduler::new(mito_config.max_background_purges));
184        Arc::new(LocalFilePurger::new(
185            purge_scheduler.clone(),
186            access_layer.clone(),
187            None,
188        ))
189    };
190
191    let current_version = {
192        let mut ssts = SstVersion::new();
193        ssts.add_files(file_purger.clone(), manifest.files.values().cloned());
194        CompactionVersion {
195            metadata: region_metadata.clone(),
196            options: req.region_options.clone(),
197            ssts: Arc::new(ssts),
198            compaction_time_window: manifest.compaction_time_window,
199        }
200    };
201
202    let ttl = match ttl_provider {
203        // Use the specified ttl.
204        Either::Left(ttl) => ttl,
205        // Get the ttl from the schema metadata manager.
206        Either::Right(schema_metadata_manager) => {
207            let (_, ttl) = find_dynamic_options(
208                req.region_id.table_id(),
209                &req.region_options,
210                &schema_metadata_manager,
211            )
212            .await
213            .unwrap_or_else(|e| {
214                warn!(e; "Failed to get ttl for region: {}", region_metadata.region_id);
215                (
216                    crate::region::options::CompactionOptions::default(),
217                    TimeToLive::default(),
218                )
219            });
220            ttl
221        }
222    };
223
224    Ok(CompactionRegion {
225        region_id: req.region_id,
226        region_options: req.region_options.clone(),
227        engine_config: Arc::new(mito_config.clone()),
228        region_metadata: region_metadata.clone(),
229        cache_manager: Arc::new(CacheManager::default()),
230        access_layer,
231        manifest_ctx,
232        current_version,
233        file_purger: Some(file_purger),
234        ttl: Some(ttl),
235        max_parallelism: req.max_parallelism,
236    })
237}
238
239impl CompactionRegion {
240    /// Get the file purger of the compaction region.
241    pub fn file_purger(&self) -> Option<Arc<LocalFilePurger>> {
242        self.file_purger.clone()
243    }
244
245    /// Stop the file purger scheduler of the compaction region.
246    pub async fn stop_purger_scheduler(&self) -> Result<()> {
247        if let Some(file_purger) = &self.file_purger {
248            file_purger.stop_scheduler().await
249        } else {
250            Ok(())
251        }
252    }
253}
254
255/// `[MergeOutput]` represents the output of merging SST files.
256#[derive(Default, Clone, Debug, Serialize, Deserialize)]
257pub struct MergeOutput {
258    pub files_to_add: Vec<FileMeta>,
259    pub files_to_remove: Vec<FileMeta>,
260    pub compaction_time_window: Option<i64>,
261}
262
263impl MergeOutput {
264    pub fn is_empty(&self) -> bool {
265        self.files_to_add.is_empty() && self.files_to_remove.is_empty()
266    }
267
268    pub fn input_file_size(&self) -> u64 {
269        self.files_to_remove.iter().map(|f| f.file_size).sum()
270    }
271
272    pub fn output_file_size(&self) -> u64 {
273        self.files_to_add.iter().map(|f| f.file_size).sum()
274    }
275}
276
277/// Compactor is the trait that defines the compaction logic.
278#[async_trait::async_trait]
279pub trait Compactor: Send + Sync + 'static {
280    /// Merge SST files for a region.
281    async fn merge_ssts(
282        &self,
283        compaction_region: &CompactionRegion,
284        picker_output: PickerOutput,
285    ) -> Result<MergeOutput>;
286
287    /// Update the manifest after merging SST files.
288    async fn update_manifest(
289        &self,
290        compaction_region: &CompactionRegion,
291        merge_output: MergeOutput,
292    ) -> Result<RegionEdit>;
293
294    /// Execute compaction for a region.
295    async fn compact(
296        &self,
297        compaction_region: &CompactionRegion,
298        compact_request_options: compact_request::Options,
299    ) -> Result<()>;
300}
301
302/// Trait for merging a single compaction output into SST files.
303///
304/// This is extracted from `DefaultCompactor` to allow injecting mock
305/// implementations in tests.
306#[async_trait::async_trait]
307pub trait SstMerger: Send + Sync + 'static {
308    async fn merge_single_output(
309        &self,
310        compaction_region: CompactionRegion,
311        output: CompactionOutput,
312        write_opts: WriteOptions,
313    ) -> Result<Vec<FileMeta>>;
314}
315
316/// The production [`SstMerger`] that reads, merges, and writes SST files.
317#[derive(Clone)]
318pub struct DefaultSstMerger;
319
320#[async_trait::async_trait]
321impl SstMerger for DefaultSstMerger {
322    async fn merge_single_output(
323        &self,
324        compaction_region: CompactionRegion,
325        output: CompactionOutput,
326        write_opts: WriteOptions,
327    ) -> Result<Vec<FileMeta>> {
328        let region_id = compaction_region.region_id;
329        let storage = compaction_region.region_options.storage.clone();
330        let index_options = compaction_region
331            .current_version
332            .options
333            .index_options
334            .clone();
335        let append_mode = compaction_region.current_version.options.append_mode;
336        let merge_mode = compaction_region.current_version.options.merge_mode();
337        let flat_format = compaction_region
338            .region_options
339            .sst_format
340            .map(|format| format == FormatType::Flat)
341            .unwrap_or(compaction_region.engine_config.default_flat_format);
342
343        let index_config = compaction_region.engine_config.index.clone();
344        let inverted_index_config = compaction_region.engine_config.inverted_index.clone();
345        let fulltext_index_config = compaction_region.engine_config.fulltext_index.clone();
346        let bloom_filter_index_config = compaction_region.engine_config.bloom_filter_index.clone();
347        #[cfg(feature = "vector_index")]
348        let vector_index_config = compaction_region.engine_config.vector_index.clone();
349
350        let input_file_names = output
351            .inputs
352            .iter()
353            .map(|f| f.file_id().to_string())
354            .join(",");
355        let max_sequence = output
356            .inputs
357            .iter()
358            .map(|f| f.meta_ref().sequence)
359            .max()
360            .flatten();
361        let builder = CompactionSstReaderBuilder {
362            metadata: compaction_region.region_metadata.clone(),
363            sst_layer: compaction_region.access_layer.clone(),
364            cache: compaction_region.cache_manager.clone(),
365            inputs: &output.inputs,
366            append_mode,
367            filter_deleted: output.filter_deleted,
368            time_range: output.output_time_range,
369            merge_mode,
370        };
371        let reader = builder.build_flat_sst_reader().await?;
372        let source = FlatSource::Stream(reader);
373        let mut metrics = Metrics::new(WriteType::Compaction);
374        let region_metadata = compaction_region.region_metadata.clone();
375        let sst_infos = compaction_region
376            .access_layer
377            .write_sst(
378                SstWriteRequest {
379                    op_type: OperationType::Compact,
380                    metadata: region_metadata.clone(),
381                    source,
382                    cache_manager: compaction_region.cache_manager.clone(),
383                    storage,
384                    max_sequence: max_sequence.map(NonZero::get),
385                    sst_write_format: if flat_format {
386                        FormatType::Flat
387                    } else {
388                        FormatType::PrimaryKey
389                    },
390                    index_options,
391                    index_config,
392                    inverted_index_config,
393                    fulltext_index_config,
394                    bloom_filter_index_config,
395                    #[cfg(feature = "vector_index")]
396                    vector_index_config,
397                },
398                &write_opts,
399                &mut metrics,
400            )
401            .await?;
402        // Convert partition expression once outside the map
403        let partition_expr = match &region_metadata.partition_expr {
404            None => None,
405            Some(json_str) if json_str.is_empty() => None,
406            Some(json_str) => PartitionExpr::from_json_str(json_str).with_context(|_| {
407                InvalidPartitionExprSnafu {
408                    expr: json_str.clone(),
409                }
410            })?,
411        };
412
413        let output_files = sst_infos
414            .into_iter()
415            .map(|sst_info| FileMeta {
416                region_id,
417                file_id: sst_info.file_id,
418                time_range: sst_info.time_range,
419                level: output.output_level,
420                file_size: sst_info.file_size,
421                max_row_group_uncompressed_size: sst_info.max_row_group_uncompressed_size,
422                available_indexes: sst_info.index_metadata.build_available_indexes(),
423                indexes: sst_info.index_metadata.build_indexes(),
424                index_file_size: sst_info.index_metadata.file_size,
425                index_version: 0,
426                num_rows: sst_info.num_rows as u64,
427                num_row_groups: sst_info.num_row_groups,
428                sequence: max_sequence,
429                partition_expr: partition_expr.clone(),
430                num_series: sst_info.num_series,
431            })
432            .collect::<Vec<_>>();
433        let output_file_names = output_files.iter().map(|f| f.file_id.to_string()).join(",");
434        info!(
435            "Region {} compaction inputs: [{}], outputs: [{}], flat_format: {}, metrics: {:?}",
436            region_id, input_file_names, output_file_names, flat_format, metrics
437        );
438        metrics.observe();
439        Ok(output_files)
440    }
441}
442
443/// DefaultCompactor is the default implementation of Compactor.
444///
445/// It is parameterized by an [`SstMerger`] to allow injecting mock
446/// implementations in tests.
447pub struct DefaultCompactor<M = DefaultSstMerger> {
448    merger: M,
449}
450
451impl Default for DefaultCompactor {
452    fn default() -> Self {
453        Self {
454            merger: DefaultSstMerger,
455        }
456    }
457}
458
459impl<M: SstMerger> DefaultCompactor<M> {
460    pub fn with_merger(merger: M) -> Self {
461        Self { merger }
462    }
463}
464
465#[async_trait::async_trait]
466impl<M: SstMerger> Compactor for DefaultCompactor<M>
467where
468    M: Clone,
469{
470    async fn merge_ssts(
471        &self,
472        compaction_region: &CompactionRegion,
473        mut picker_output: PickerOutput,
474    ) -> Result<MergeOutput> {
475        let internal_parallelism = compaction_region.max_parallelism.max(1);
476        let compaction_time_window = picker_output.time_window_size;
477        let region_id = compaction_region.region_id;
478
479        // Build tasks along with their input file metas so we can track which
480        // inputs correspond to each task.
481        let mut tasks: Vec<(Vec<FileMeta>, _)> = Vec::with_capacity(picker_output.outputs.len());
482
483        for output in picker_output.outputs.drain(..) {
484            let inputs_to_remove: Vec<_> =
485                output.inputs.iter().map(|f| f.meta_ref().clone()).collect();
486            let write_opts = WriteOptions {
487                write_buffer_size: compaction_region.engine_config.sst_write_buffer_size,
488                max_file_size: picker_output.max_file_size,
489                ..Default::default()
490            };
491            let merger = self.merger.clone();
492            let compaction_region = compaction_region.clone();
493            let fut = async move {
494                merger
495                    .merge_single_output(compaction_region, output, write_opts)
496                    .await
497            };
498            tasks.push((inputs_to_remove, fut));
499        }
500
501        let mut output_files = Vec::with_capacity(tasks.len());
502        let mut compacted_inputs = Vec::with_capacity(
503            tasks.iter().map(|(inputs, _)| inputs.len()).sum::<usize>()
504                + picker_output.expired_ssts.len(),
505        );
506
507        while !tasks.is_empty() {
508            let mut chunk: Vec<(Vec<FileMeta>, _)> = Vec::with_capacity(internal_parallelism);
509            for _ in 0..internal_parallelism {
510                if let Some(task) = tasks.pop() {
511                    chunk.push(task);
512                }
513            }
514            let spawned: Vec<_> = chunk
515                .into_iter()
516                .map(|(inputs, fut)| {
517                    let handle = common_runtime::spawn_compact(fut);
518                    (inputs, handle)
519                })
520                .collect();
521
522            for (inputs, handle) in spawned {
523                match handle.await {
524                    Ok(Ok(files)) => {
525                        output_files.extend(files);
526                        compacted_inputs.extend(inputs);
527                    }
528                    Ok(Err(e)) => {
529                        warn!(
530                            e; "Region {} failed to merge compaction output with inputs: [{}], skipping",
531                            region_id,
532                            inputs.iter().map(|f| f.file_id.to_string()).join(",")
533                        );
534                    }
535                    Err(e) => {
536                        warn!(
537                            "Region {} compaction task join error for inputs: [{}], skipping: {}",
538                            region_id,
539                            inputs.iter().map(|f| f.file_id.to_string()).join(","),
540                            e
541                        );
542                        return Err(e).context(error::JoinSnafu);
543                    }
544                }
545            }
546        }
547
548        // Include expired SSTs in removals — these don't depend on merge success.
549        compacted_inputs.extend(
550            picker_output
551                .expired_ssts
552                .iter()
553                .map(|f| f.meta_ref().clone()),
554        );
555
556        Ok(MergeOutput {
557            files_to_add: output_files,
558            files_to_remove: compacted_inputs,
559            compaction_time_window: Some(compaction_time_window),
560        })
561    }
562
563    async fn update_manifest(
564        &self,
565        compaction_region: &CompactionRegion,
566        merge_output: MergeOutput,
567    ) -> Result<RegionEdit> {
568        // Write region edit to manifest.
569        let edit = RegionEdit {
570            files_to_add: merge_output.files_to_add,
571            files_to_remove: merge_output.files_to_remove,
572            // Use current timestamp as the edit timestamp.
573            timestamp_ms: Some(chrono::Utc::now().timestamp_millis()),
574            compaction_time_window: merge_output
575                .compaction_time_window
576                .map(|seconds| Duration::from_secs(seconds as u64)),
577            flushed_entry_id: None,
578            flushed_sequence: None,
579            committed_sequence: None,
580        };
581
582        let action_list = RegionMetaActionList::with_action(RegionMetaAction::Edit(edit.clone()));
583        // TODO: We might leak files if we fail to update manifest. We can add a cleanup task to remove them later.
584        compaction_region
585            .manifest_ctx
586            .update_manifest(RegionLeaderState::Writable, action_list, false)
587            .await?;
588
589        Ok(edit)
590    }
591
592    // The default implementation of compact combines the merge_ssts and update_manifest functions.
593    // Note: It's local compaction and only used for testing purpose.
594    async fn compact(
595        &self,
596        compaction_region: &CompactionRegion,
597        compact_request_options: compact_request::Options,
598    ) -> Result<()> {
599        let picker_output = {
600            let picker_output = new_picker(
601                &compact_request_options,
602                &compaction_region.region_options.compaction,
603                compaction_region.region_options.append_mode,
604                None,
605            )
606            .pick(compaction_region);
607
608            if let Some(picker_output) = picker_output {
609                picker_output
610            } else {
611                info!(
612                    "No files to compact for region_id: {}",
613                    compaction_region.region_id
614                );
615                return Ok(());
616            }
617        };
618
619        let merge_output = self.merge_ssts(compaction_region, picker_output).await?;
620        if merge_output.is_empty() {
621            info!(
622                "No files to compact for region_id: {}",
623                compaction_region.region_id
624            );
625            return Ok(());
626        }
627
628        metrics::COMPACTION_INPUT_BYTES.inc_by(merge_output.input_file_size() as f64);
629        metrics::COMPACTION_OUTPUT_BYTES.inc_by(merge_output.output_file_size() as f64);
630        self.update_manifest(compaction_region, merge_output)
631            .await?;
632
633        Ok(())
634    }
635}
636
637#[cfg(test)]
638mod tests {
639    use std::sync::Mutex;
640    use std::sync::atomic::{AtomicUsize, Ordering};
641
642    use store_api::storage::{FileId, RegionId};
643
644    use super::*;
645    use crate::cache::CacheManager;
646    use crate::compaction::picker::PickerOutput;
647    use crate::sst::file::FileHandle;
648    use crate::sst::file_purger::NoopFilePurger;
649    use crate::sst::version::SstVersion;
650    use crate::test_util::memtable_util::metadata_for_test;
651    use crate::test_util::scheduler_util::SchedulerEnv;
652
653    fn dummy_file_meta() -> FileMeta {
654        FileMeta {
655            region_id: RegionId::new(1, 1),
656            file_id: FileId::random(),
657            file_size: 100,
658            ..Default::default()
659        }
660    }
661
662    fn new_file_handle(meta: FileMeta) -> FileHandle {
663        FileHandle::new(meta, Arc::new(NoopFilePurger))
664    }
665
666    /// Build a minimal [`CompactionRegion`] suitable for tests where the
667    /// [`SstMerger`] is mocked and never touches the access layer.
668    async fn new_test_compaction_region() -> CompactionRegion {
669        let env = SchedulerEnv::new().await;
670        let metadata = metadata_for_test();
671        let manifest_ctx = env.mock_manifest_context(metadata.clone()).await;
672        CompactionRegion {
673            region_id: RegionId::new(1, 1),
674            region_options: RegionOptions::default(),
675            engine_config: Arc::new(MitoConfig::default()),
676            region_metadata: metadata.clone(),
677            cache_manager: Arc::new(CacheManager::default()),
678            access_layer: env.access_layer.clone(),
679            manifest_ctx,
680            current_version: CompactionVersion {
681                metadata,
682                options: RegionOptions::default(),
683                ssts: Arc::new(SstVersion::new()),
684                compaction_time_window: None,
685            },
686            file_purger: None,
687            ttl: None,
688            max_parallelism: 1,
689        }
690    }
691
692    /// An [`SstMerger`] that returns pre-configured results per call index.
693    ///
694    /// Call 0 gets `results[0]`, call 1 gets `results[1]`, etc.
695    #[derive(Clone)]
696    struct MockMerger {
697        results: Arc<Mutex<Vec<Result<Vec<FileMeta>>>>>,
698        call_idx: Arc<AtomicUsize>,
699    }
700
701    impl MockMerger {
702        fn new(results: Vec<Result<Vec<FileMeta>>>) -> Self {
703            Self {
704                results: Arc::new(Mutex::new(results)),
705                call_idx: Arc::new(AtomicUsize::new(0)),
706            }
707        }
708    }
709
710    #[async_trait::async_trait]
711    impl SstMerger for MockMerger {
712        async fn merge_single_output(
713            &self,
714            _compaction_region: CompactionRegion,
715            _output: CompactionOutput,
716            _write_opts: WriteOptions,
717        ) -> Result<Vec<FileMeta>> {
718            let idx = self.call_idx.fetch_add(1, Ordering::SeqCst);
719            match self.results.lock().unwrap().get(idx) {
720                Some(Ok(files)) => Ok(files.clone()),
721                Some(Err(_)) => error::InvalidMetaSnafu {
722                    reason: format!("simulated failure at index {idx}"),
723                }
724                .fail(),
725                None => panic!("MockMerger: no result configured for call index {idx}"),
726            }
727        }
728    }
729
730    #[tokio::test]
731    async fn test_partial_merge_failure_collects_only_successful_outputs() {
732        common_telemetry::init_default_ut_logging();
733
734        let compaction_region = new_test_compaction_region().await;
735
736        // Prepare 3 compaction outputs: output 0 and 2 succeed, output 1 fails.
737        let input_meta_0 = dummy_file_meta();
738        let input_meta_1 = dummy_file_meta();
739        let input_meta_2 = dummy_file_meta();
740
741        let output_meta_0 = vec![dummy_file_meta()];
742        let output_meta_2 = vec![dummy_file_meta(), dummy_file_meta()];
743
744        let merger = MockMerger::new(vec![
745            Ok(output_meta_0.clone()),
746            Err(error::InvalidMetaSnafu {
747                reason: "boom".to_string(),
748            }
749            .build()),
750            Ok(output_meta_2.clone()),
751        ]);
752        let compactor = DefaultCompactor::with_merger(merger);
753
754        let picker_output = PickerOutput {
755            outputs: vec![
756                CompactionOutput {
757                    output_level: 1,
758                    inputs: vec![new_file_handle(input_meta_0.clone())],
759                    filter_deleted: false,
760                    output_time_range: None,
761                },
762                CompactionOutput {
763                    output_level: 1,
764                    inputs: vec![new_file_handle(input_meta_1.clone())],
765                    filter_deleted: false,
766                    output_time_range: None,
767                },
768                CompactionOutput {
769                    output_level: 1,
770                    inputs: vec![new_file_handle(input_meta_2.clone())],
771                    filter_deleted: false,
772                    output_time_range: None,
773                },
774            ],
775            expired_ssts: vec![],
776            time_window_size: 3600,
777            max_file_size: None,
778        };
779
780        let merge_output = compactor
781            .merge_ssts(&compaction_region, picker_output)
782            .await
783            .unwrap();
784
785        // Outputs 0 and 2 succeeded (1 + 2 = 3 files added).
786        assert_eq!(merge_output.files_to_add.len(), 3);
787        // Only inputs from successful merges should be removed.
788        assert_eq!(merge_output.files_to_remove.len(), 2);
789
790        let removed_ids: Vec<_> = merge_output
791            .files_to_remove
792            .iter()
793            .map(|f| f.file_id)
794            .collect();
795        assert!(removed_ids.contains(&input_meta_0.file_id));
796        assert!(removed_ids.contains(&input_meta_2.file_id));
797        // The failed output's input must NOT be removed.
798        assert!(!removed_ids.contains(&input_meta_1.file_id));
799    }
800
801    #[tokio::test]
802    async fn test_all_outputs_succeed() {
803        common_telemetry::init_default_ut_logging();
804
805        let compaction_region = new_test_compaction_region().await;
806        let input_meta = dummy_file_meta();
807        let output_meta = vec![dummy_file_meta()];
808
809        let merger = MockMerger::new(vec![Ok(output_meta.clone())]);
810        let compactor = DefaultCompactor::with_merger(merger);
811
812        let picker_output = PickerOutput {
813            outputs: vec![CompactionOutput {
814                output_level: 1,
815                inputs: vec![new_file_handle(input_meta.clone())],
816                filter_deleted: false,
817                output_time_range: None,
818            }],
819            expired_ssts: vec![],
820            time_window_size: 3600,
821            max_file_size: None,
822        };
823
824        let merge_output = compactor
825            .merge_ssts(&compaction_region, picker_output)
826            .await
827            .unwrap();
828
829        assert_eq!(merge_output.files_to_add.len(), 1);
830        assert_eq!(merge_output.files_to_add[0].file_id, output_meta[0].file_id);
831        assert_eq!(merge_output.files_to_remove.len(), 1);
832        assert_eq!(merge_output.files_to_remove[0].file_id, input_meta.file_id);
833    }
834
835    #[tokio::test]
836    async fn test_expired_ssts_always_removed() {
837        common_telemetry::init_default_ut_logging();
838
839        let compaction_region = new_test_compaction_region().await;
840        let input_meta = dummy_file_meta();
841        let expired_meta = dummy_file_meta();
842
843        // The single merge output fails, but expired SSTs should still be removed.
844        let merger = MockMerger::new(vec![Err(error::InvalidMetaSnafu {
845            reason: "fail".to_string(),
846        }
847        .build())]);
848        let compactor = DefaultCompactor::with_merger(merger);
849
850        let picker_output = PickerOutput {
851            outputs: vec![CompactionOutput {
852                output_level: 1,
853                inputs: vec![new_file_handle(input_meta.clone())],
854                filter_deleted: false,
855                output_time_range: None,
856            }],
857            expired_ssts: vec![new_file_handle(expired_meta.clone())],
858            time_window_size: 3600,
859            max_file_size: None,
860        };
861
862        let merge_output = compactor
863            .merge_ssts(&compaction_region, picker_output)
864            .await
865            .unwrap();
866
867        // No files added (merge failed).
868        assert!(merge_output.files_to_add.is_empty());
869        // Only the expired SST should be in files_to_remove (not the failed merge's input).
870        assert_eq!(merge_output.files_to_remove.len(), 1);
871        assert_eq!(
872            merge_output.files_to_remove[0].file_id,
873            expired_meta.file_id
874        );
875    }
876}