mito2/
compaction.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod buckets;
16pub mod compactor;
17pub mod memory_manager;
18pub mod picker;
19pub mod run;
20mod task;
21#[cfg(test)]
22mod test_util;
23mod twcs;
24mod window;
25
26use std::collections::HashMap;
27use std::sync::Arc;
28use std::time::Instant;
29
30use api::v1::region::compact_request;
31use api::v1::region::compact_request::Options;
32use common_base::Plugins;
33use common_memory_manager::OnExhaustedPolicy;
34use common_meta::key::SchemaMetadataManagerRef;
35use common_telemetry::{debug, error, info, warn};
36use common_time::range::TimestampRange;
37use common_time::timestamp::TimeUnit;
38use common_time::{TimeToLive, Timestamp};
39use datafusion_common::ScalarValue;
40use datafusion_expr::Expr;
41use serde::{Deserialize, Serialize};
42use snafu::{OptionExt, ResultExt};
43use store_api::metadata::RegionMetadataRef;
44use store_api::storage::{RegionId, TableId};
45use task::MAX_PARALLEL_COMPACTION;
46use tokio::sync::mpsc::{self, Sender};
47
48use crate::access_layer::AccessLayerRef;
49use crate::cache::{CacheManagerRef, CacheStrategy};
50use crate::compaction::compactor::{CompactionRegion, CompactionVersion, DefaultCompactor};
51use crate::compaction::memory_manager::CompactionMemoryManager;
52use crate::compaction::picker::{CompactionTask, PickerOutput, new_picker};
53use crate::compaction::task::CompactionTaskImpl;
54use crate::config::MitoConfig;
55use crate::error::{
56    CompactRegionSnafu, Error, GetSchemaMetadataSnafu, ManualCompactionOverrideSnafu,
57    RegionClosedSnafu, RegionDroppedSnafu, RegionTruncatedSnafu, RemoteCompactionSnafu, Result,
58    TimeRangePredicateOverflowSnafu, TimeoutSnafu,
59};
60use crate::metrics::{COMPACTION_STAGE_ELAPSED, INFLIGHT_COMPACTION_COUNT};
61use crate::read::projection::ProjectionMapper;
62use crate::read::scan_region::{PredicateGroup, ScanInput};
63use crate::read::seq_scan::SeqScan;
64use crate::read::{BoxedBatchReader, BoxedRecordBatchStream};
65use crate::region::options::{MergeMode, RegionOptions};
66use crate::region::version::VersionControlRef;
67use crate::region::{ManifestContextRef, RegionLeaderState, RegionRoleState};
68use crate::request::{OptionOutputTx, OutputTx, SenderDdlRequest, WorkerRequestWithTime};
69use crate::schedule::remote_job_scheduler::{
70    CompactionJob, DefaultNotifier, RemoteJob, RemoteJobSchedulerRef,
71};
72use crate::schedule::scheduler::SchedulerRef;
73use crate::sst::file::{FileHandle, FileMeta, Level};
74use crate::sst::version::LevelMeta;
75use crate::worker::WorkerListener;
76
77/// Region compaction request.
78pub struct CompactionRequest {
79    pub(crate) engine_config: Arc<MitoConfig>,
80    pub(crate) current_version: CompactionVersion,
81    pub(crate) access_layer: AccessLayerRef,
82    /// Sender to send notification to the region worker.
83    pub(crate) request_sender: mpsc::Sender<WorkerRequestWithTime>,
84    /// Waiters of the compaction request.
85    pub(crate) waiters: Vec<OutputTx>,
86    /// Start time of compaction task.
87    pub(crate) start_time: Instant,
88    pub(crate) cache_manager: CacheManagerRef,
89    pub(crate) manifest_ctx: ManifestContextRef,
90    pub(crate) listener: WorkerListener,
91    pub(crate) schema_metadata_manager: SchemaMetadataManagerRef,
92    pub(crate) max_parallelism: usize,
93}
94
95impl CompactionRequest {
96    pub(crate) fn region_id(&self) -> RegionId {
97        self.current_version.metadata.region_id
98    }
99}
100
101/// Compaction scheduler tracks and manages compaction tasks.
102pub(crate) struct CompactionScheduler {
103    scheduler: SchedulerRef,
104    /// Compacting regions.
105    region_status: HashMap<RegionId, CompactionStatus>,
106    /// Request sender of the worker that this scheduler belongs to.
107    request_sender: Sender<WorkerRequestWithTime>,
108    cache_manager: CacheManagerRef,
109    engine_config: Arc<MitoConfig>,
110    memory_manager: Arc<CompactionMemoryManager>,
111    memory_policy: OnExhaustedPolicy,
112    listener: WorkerListener,
113    /// Plugins for the compaction scheduler.
114    plugins: Plugins,
115}
116
117impl CompactionScheduler {
118    #[allow(clippy::too_many_arguments)]
119    pub(crate) fn new(
120        scheduler: SchedulerRef,
121        request_sender: Sender<WorkerRequestWithTime>,
122        cache_manager: CacheManagerRef,
123        engine_config: Arc<MitoConfig>,
124        listener: WorkerListener,
125        plugins: Plugins,
126        memory_manager: Arc<CompactionMemoryManager>,
127        memory_policy: OnExhaustedPolicy,
128    ) -> Self {
129        Self {
130            scheduler,
131            region_status: HashMap::new(),
132            request_sender,
133            cache_manager,
134            engine_config,
135            memory_manager,
136            memory_policy,
137            listener,
138            plugins,
139        }
140    }
141
142    /// Schedules a compaction for the region.
143    #[allow(clippy::too_many_arguments)]
144    pub(crate) async fn schedule_compaction(
145        &mut self,
146        region_id: RegionId,
147        compact_options: compact_request::Options,
148        version_control: &VersionControlRef,
149        access_layer: &AccessLayerRef,
150        waiter: OptionOutputTx,
151        manifest_ctx: &ManifestContextRef,
152        schema_metadata_manager: SchemaMetadataManagerRef,
153        max_parallelism: usize,
154    ) -> Result<()> {
155        // skip compaction if region is in staging state
156        let current_state = manifest_ctx.current_state();
157        if current_state == RegionRoleState::Leader(RegionLeaderState::Staging) {
158            info!(
159                "Skipping compaction for region {} in staging mode, options: {:?}",
160                region_id, compact_options
161            );
162            waiter.send(Ok(0));
163            return Ok(());
164        }
165
166        if let Some(status) = self.region_status.get_mut(&region_id) {
167            match compact_options {
168                Options::Regular(_) => {
169                    // Region is compacting. Add the waiter to pending list.
170                    status.merge_waiter(waiter);
171                }
172                options @ Options::StrictWindow(_) => {
173                    // Incoming compaction request is manually triggered.
174                    status.set_pending_request(PendingCompaction {
175                        options,
176                        waiter,
177                        max_parallelism,
178                    });
179                    info!(
180                        "Region {} is compacting, manually compaction will be re-scheduled.",
181                        region_id
182                    );
183                }
184            }
185            return Ok(());
186        }
187
188        // The region can compact directly.
189        let mut status: CompactionStatus =
190            CompactionStatus::new(region_id, version_control.clone(), access_layer.clone());
191        let request = status.new_compaction_request(
192            self.request_sender.clone(),
193            waiter,
194            self.engine_config.clone(),
195            self.cache_manager.clone(),
196            manifest_ctx,
197            self.listener.clone(),
198            schema_metadata_manager,
199            max_parallelism,
200        );
201
202        let result = self
203            .schedule_compaction_request(request, compact_options)
204            .await;
205        if matches!(result, Ok(true)) {
206            // Only if the compaction request is scheduled successfully,
207            // we insert the region into the status map.
208            self.region_status.insert(region_id, status);
209        }
210
211        self.listener.on_compaction_scheduled(region_id);
212        result.map(|_| ())
213    }
214
215    // Handle pending manual compaction request for the region.
216    //
217    // Returns true if should early return, false otherwise.
218    pub(crate) async fn handle_pending_compaction_request(
219        &mut self,
220        region_id: RegionId,
221        manifest_ctx: &ManifestContextRef,
222        schema_metadata_manager: SchemaMetadataManagerRef,
223    ) -> bool {
224        let Some(status) = self.region_status.get_mut(&region_id) else {
225            return true;
226        };
227
228        // If there is a pending manual compaction request, schedule it.
229        // and defer returning the pending DDL requests to the caller.
230        let Some(pending_request) = std::mem::take(&mut status.pending_request) else {
231            return false;
232        };
233
234        let PendingCompaction {
235            options,
236            waiter,
237            max_parallelism,
238        } = pending_request;
239
240        let request = {
241            status.new_compaction_request(
242                self.request_sender.clone(),
243                waiter,
244                self.engine_config.clone(),
245                self.cache_manager.clone(),
246                manifest_ctx,
247                self.listener.clone(),
248                schema_metadata_manager,
249                max_parallelism,
250            )
251        };
252
253        match self.schedule_compaction_request(request, options).await {
254            Ok(true) => {
255                debug!(
256                    "Successfully scheduled manual compaction for region id: {}",
257                    region_id
258                );
259                true
260            }
261            Ok(false) => {
262                // We still need to handle the pending DDL requests.
263                // So we can't return early here.
264                false
265            }
266            Err(e) => {
267                error!(e; "Failed to continue pending manual compaction for region id: {}", region_id);
268                self.remove_region_on_failure(region_id, Arc::new(e));
269                true
270            }
271        }
272    }
273
274    /// Notifies the scheduler that the compaction job is finished successfully.
275    pub(crate) async fn on_compaction_finished(
276        &mut self,
277        region_id: RegionId,
278        manifest_ctx: &ManifestContextRef,
279        schema_metadata_manager: SchemaMetadataManagerRef,
280    ) -> Vec<SenderDdlRequest> {
281        // If there a pending compaction request, handle it first
282        // and defer returning the pending DDL requests to the caller.
283        if self
284            .handle_pending_compaction_request(
285                region_id,
286                manifest_ctx,
287                schema_metadata_manager.clone(),
288            )
289            .await
290        {
291            return Vec::new();
292        }
293
294        let Some(status) = self.region_status.get_mut(&region_id) else {
295            // The region status might be removed by the previous steps.
296            // So we return empty DDL requests.
297            return Vec::new();
298        };
299
300        // Notify all waiters that compaction is finished.
301        for waiter in std::mem::take(&mut status.waiters) {
302            waiter.send(Ok(0));
303        }
304
305        // If there are pending DDL requests, run them.
306        let pending_ddl_requests = std::mem::take(&mut status.pending_ddl_requests);
307        if !pending_ddl_requests.is_empty() {
308            self.region_status.remove(&region_id);
309            // If there are pending DDL requests, we should return them to the caller.
310            // And skip try to schedule next compaction task.
311            return pending_ddl_requests;
312        }
313
314        // We should always try to compact the region until picker returns None.
315        let request = status.new_compaction_request(
316            self.request_sender.clone(),
317            OptionOutputTx::none(),
318            self.engine_config.clone(),
319            self.cache_manager.clone(),
320            manifest_ctx,
321            self.listener.clone(),
322            schema_metadata_manager,
323            MAX_PARALLEL_COMPACTION,
324        );
325
326        // Try to schedule next compaction task for this region.
327        match self
328            .schedule_compaction_request(
329                request,
330                compact_request::Options::Regular(Default::default()),
331            )
332            .await
333        {
334            Ok(true) => {
335                debug!(
336                    "Successfully scheduled next compaction for region id: {}",
337                    region_id
338                );
339            }
340            Ok(false) => {
341                // No further compaction tasks can be scheduled; cleanup the `CompactionStatus` for this region.
342                // All DDL requests and pending compaction requests have already been processed.
343                // Safe to remove the region from status tracking.
344                self.region_status.remove(&region_id);
345            }
346            Err(e) => {
347                error!(e; "Failed to schedule next compaction for region {}", region_id);
348                self.remove_region_on_failure(region_id, Arc::new(e));
349            }
350        }
351
352        Vec::new()
353    }
354
355    /// Notifies the scheduler that the compaction job is failed.
356    pub(crate) fn on_compaction_failed(&mut self, region_id: RegionId, err: Arc<Error>) {
357        error!(err; "Region {} failed to compact, cancel all pending tasks", region_id);
358        self.remove_region_on_failure(region_id, err);
359    }
360
361    /// Notifies the scheduler that the region is dropped.
362    pub(crate) fn on_region_dropped(&mut self, region_id: RegionId) {
363        self.remove_region_on_failure(
364            region_id,
365            Arc::new(RegionDroppedSnafu { region_id }.build()),
366        );
367    }
368
369    /// Notifies the scheduler that the region is closed.
370    pub(crate) fn on_region_closed(&mut self, region_id: RegionId) {
371        self.remove_region_on_failure(region_id, Arc::new(RegionClosedSnafu { region_id }.build()));
372    }
373
374    /// Notifies the scheduler that the region is truncated.
375    pub(crate) fn on_region_truncated(&mut self, region_id: RegionId) {
376        self.remove_region_on_failure(
377            region_id,
378            Arc::new(RegionTruncatedSnafu { region_id }.build()),
379        );
380    }
381
382    /// Add ddl request to pending queue.
383    ///
384    /// # Panics
385    /// Panics if region didn't request compaction.
386    pub(crate) fn add_ddl_request_to_pending(&mut self, request: SenderDdlRequest) {
387        debug!(
388            "Added pending DDL request for region: {}, ddl: {:?}",
389            request.region_id, request.request
390        );
391        let status = self.region_status.get_mut(&request.region_id).unwrap();
392        status.pending_ddl_requests.push(request);
393    }
394
395    #[cfg(test)]
396    pub(crate) fn has_pending_ddls(&self, region_id: RegionId) -> bool {
397        let has_pending = self
398            .region_status
399            .get(&region_id)
400            .map(|status| !status.pending_ddl_requests.is_empty())
401            .unwrap_or(false);
402        debug!(
403            "Checked pending DDL requests for region: {}, has_pending: {}",
404            region_id, has_pending
405        );
406        has_pending
407    }
408
409    /// Returns true if the region is compacting.
410    pub(crate) fn is_compacting(&self, region_id: RegionId) -> bool {
411        self.region_status.contains_key(&region_id)
412    }
413
414    /// Schedules a compaction request.
415    ///
416    /// Returns true if the compaction request is scheduled successfully.
417    /// Returns false if no compaction task can be scheduled for this region.
418    async fn schedule_compaction_request(
419        &mut self,
420        request: CompactionRequest,
421        options: compact_request::Options,
422    ) -> Result<bool> {
423        let region_id = request.region_id();
424        let (dynamic_compaction_opts, ttl) = find_dynamic_options(
425            region_id.table_id(),
426            &request.current_version.options,
427            &request.schema_metadata_manager,
428        )
429        .await
430        .unwrap_or_else(|e| {
431            warn!(e; "Failed to find dynamic options for region: {}", region_id);
432            (
433                request.current_version.options.compaction.clone(),
434                request.current_version.options.ttl.unwrap_or_default(),
435            )
436        });
437
438        let picker = new_picker(
439            &options,
440            &dynamic_compaction_opts,
441            request.current_version.options.append_mode,
442            Some(self.engine_config.max_background_compactions),
443        );
444        let region_id = request.region_id();
445        let CompactionRequest {
446            engine_config,
447            current_version,
448            access_layer,
449            request_sender,
450            waiters,
451            start_time,
452            cache_manager,
453            manifest_ctx,
454            listener,
455            schema_metadata_manager: _,
456            max_parallelism,
457        } = request;
458
459        debug!(
460            "Pick compaction strategy {:?} for region: {}, ttl: {:?}",
461            picker, region_id, ttl
462        );
463
464        let compaction_region = CompactionRegion {
465            region_id,
466            current_version: current_version.clone(),
467            region_options: RegionOptions {
468                compaction: dynamic_compaction_opts.clone(),
469                ..current_version.options.clone()
470            },
471            engine_config: engine_config.clone(),
472            region_metadata: current_version.metadata.clone(),
473            cache_manager: cache_manager.clone(),
474            access_layer: access_layer.clone(),
475            manifest_ctx: manifest_ctx.clone(),
476            file_purger: None,
477            ttl: Some(ttl),
478            max_parallelism,
479        };
480
481        let picker_output = {
482            let _pick_timer = COMPACTION_STAGE_ELAPSED
483                .with_label_values(&["pick"])
484                .start_timer();
485            picker.pick(&compaction_region)
486        };
487
488        let picker_output = if let Some(picker_output) = picker_output {
489            picker_output
490        } else {
491            // Nothing to compact, we are done. Notifies all waiters as we consume the compaction request.
492            for waiter in waiters {
493                waiter.send(Ok(0));
494            }
495            return Ok(false);
496        };
497
498        // If specified to run compaction remotely, we schedule the compaction job remotely.
499        // It will fall back to local compaction if there is no remote job scheduler.
500        let waiters = if dynamic_compaction_opts.remote_compaction() {
501            if let Some(remote_job_scheduler) = &self.plugins.get::<RemoteJobSchedulerRef>() {
502                let remote_compaction_job = CompactionJob {
503                    compaction_region: compaction_region.clone(),
504                    picker_output: picker_output.clone(),
505                    start_time,
506                    waiters,
507                    ttl,
508                };
509
510                let result = remote_job_scheduler
511                    .schedule(
512                        RemoteJob::CompactionJob(remote_compaction_job),
513                        Box::new(DefaultNotifier {
514                            request_sender: request_sender.clone(),
515                        }),
516                    )
517                    .await;
518
519                match result {
520                    Ok(job_id) => {
521                        info!(
522                            "Scheduled remote compaction job {} for region {}",
523                            job_id, region_id
524                        );
525                        INFLIGHT_COMPACTION_COUNT.inc();
526                        return Ok(true);
527                    }
528                    Err(e) => {
529                        if !dynamic_compaction_opts.fallback_to_local() {
530                            error!(e; "Failed to schedule remote compaction job for region {}", region_id);
531                            return RemoteCompactionSnafu {
532                                region_id,
533                                job_id: None,
534                                reason: e.reason,
535                            }
536                            .fail();
537                        }
538
539                        error!(e; "Failed to schedule remote compaction job for region {}, fallback to local compaction", region_id);
540
541                        // Return the waiters back to the caller for local compaction.
542                        e.waiters
543                    }
544                }
545            } else {
546                debug!(
547                    "Remote compaction is not enabled, fallback to local compaction for region {}",
548                    region_id
549                );
550                waiters
551            }
552        } else {
553            waiters
554        };
555
556        // Create a local compaction task.
557        let estimated_bytes = estimate_compaction_bytes(&picker_output);
558        let local_compaction_task = Box::new(CompactionTaskImpl {
559            request_sender,
560            waiters,
561            start_time,
562            listener,
563            picker_output,
564            compaction_region,
565            compactor: Arc::new(DefaultCompactor {}),
566            memory_manager: self.memory_manager.clone(),
567            memory_policy: self.memory_policy,
568            estimated_memory_bytes: estimated_bytes,
569        });
570
571        self.submit_compaction_task(local_compaction_task, region_id)
572            .map(|_| true)
573    }
574
575    fn submit_compaction_task(
576        &mut self,
577        mut task: Box<CompactionTaskImpl>,
578        region_id: RegionId,
579    ) -> Result<()> {
580        self.scheduler
581            .schedule(Box::pin(async move {
582                INFLIGHT_COMPACTION_COUNT.inc();
583                task.run().await;
584                INFLIGHT_COMPACTION_COUNT.dec();
585            }))
586            .inspect_err(
587                |e| error!(e; "Failed to submit compaction request for region {}", region_id),
588            )
589    }
590
591    fn remove_region_on_failure(&mut self, region_id: RegionId, err: Arc<Error>) {
592        // Remove this region.
593        let Some(status) = self.region_status.remove(&region_id) else {
594            return;
595        };
596
597        // Notifies all pending tasks.
598        status.on_failure(err);
599    }
600}
601
602impl Drop for CompactionScheduler {
603    fn drop(&mut self) {
604        for (region_id, status) in self.region_status.drain() {
605            // We are shutting down so notify all pending tasks.
606            status.on_failure(Arc::new(RegionClosedSnafu { region_id }.build()));
607        }
608    }
609}
610
611/// Finds compaction options and TTL together with a single metadata fetch to reduce RTT.
612async fn find_dynamic_options(
613    table_id: TableId,
614    region_options: &crate::region::options::RegionOptions,
615    schema_metadata_manager: &SchemaMetadataManagerRef,
616) -> Result<(crate::region::options::CompactionOptions, TimeToLive)> {
617    if region_options.compaction_override && region_options.ttl.is_some() {
618        debug!(
619            "Use region options directly for table {}: compaction={:?}, ttl={:?}",
620            table_id, region_options.compaction, region_options.ttl
621        );
622        return Ok((
623            region_options.compaction.clone(),
624            region_options.ttl.unwrap(),
625        ));
626    }
627
628    let db_options = tokio::time::timeout(
629        crate::config::FETCH_OPTION_TIMEOUT,
630        schema_metadata_manager.get_schema_options_by_table_id(table_id),
631    )
632    .await
633    .context(TimeoutSnafu)?
634    .context(GetSchemaMetadataSnafu)?;
635
636    let ttl = if region_options.ttl.is_some() {
637        debug!(
638            "Use region TTL directly for table {}: ttl={:?}",
639            table_id, region_options.ttl
640        );
641        region_options.ttl.unwrap()
642    } else {
643        db_options
644            .as_ref()
645            .and_then(|options| options.ttl)
646            .unwrap_or_default()
647            .into()
648    };
649
650    let compaction = if !region_options.compaction_override {
651        if let Some(schema_opts) = db_options {
652            let map: HashMap<String, String> = schema_opts
653                .extra_options
654                .iter()
655                .filter_map(|(k, v)| {
656                    if k.starts_with("compaction.") {
657                        Some((k.clone(), v.clone()))
658                    } else {
659                        None
660                    }
661                })
662                .collect();
663            if map.is_empty() {
664                region_options.compaction.clone()
665            } else {
666                crate::region::options::RegionOptions::try_from(&map)
667                    .map(|o| o.compaction)
668                    .unwrap_or_else(|e| {
669                        error!(e; "Failed to create RegionOptions from map");
670                        region_options.compaction.clone()
671                    })
672            }
673        } else {
674            debug!(
675                "DB options is None for table {}, use region compaction: compaction={:?}",
676                table_id, region_options.compaction
677            );
678            region_options.compaction.clone()
679        }
680    } else {
681        debug!(
682            "No schema options for table {}, use region compaction: compaction={:?}",
683            table_id, region_options.compaction
684        );
685        region_options.compaction.clone()
686    };
687
688    debug!(
689        "Resolved dynamic options for table {}: compaction={:?}, ttl={:?}",
690        table_id, compaction, ttl
691    );
692    Ok((compaction, ttl))
693}
694
695/// Status of running and pending region compaction tasks.
696struct CompactionStatus {
697    /// Id of the region.
698    region_id: RegionId,
699    /// Version control of the region.
700    version_control: VersionControlRef,
701    /// Access layer of the region.
702    access_layer: AccessLayerRef,
703    /// Pending waiters for compaction.
704    waiters: Vec<OutputTx>,
705    /// Pending compactions that are supposed to run as soon as current compaction task finished.
706    pending_request: Option<PendingCompaction>,
707    /// Pending DDL requests that should run when compaction is done.
708    pending_ddl_requests: Vec<SenderDdlRequest>,
709}
710
711impl CompactionStatus {
712    /// Creates a new [CompactionStatus]
713    fn new(
714        region_id: RegionId,
715        version_control: VersionControlRef,
716        access_layer: AccessLayerRef,
717    ) -> CompactionStatus {
718        CompactionStatus {
719            region_id,
720            version_control,
721            access_layer,
722            waiters: Vec::new(),
723            pending_request: None,
724            pending_ddl_requests: Vec::new(),
725        }
726    }
727
728    /// Merge the waiter to the pending compaction.
729    fn merge_waiter(&mut self, mut waiter: OptionOutputTx) {
730        if let Some(waiter) = waiter.take_inner() {
731            self.waiters.push(waiter);
732        }
733    }
734
735    /// Set pending compaction request or replace current value if already exist.
736    fn set_pending_request(&mut self, pending: PendingCompaction) {
737        if let Some(prev) = self.pending_request.replace(pending) {
738            debug!(
739                "Replace pending compaction options with new request {:?} for region: {}",
740                prev.options, self.region_id
741            );
742            prev.waiter.send(ManualCompactionOverrideSnafu.fail());
743        }
744    }
745
746    fn on_failure(mut self, err: Arc<Error>) {
747        for waiter in self.waiters.drain(..) {
748            waiter.send(Err(err.clone()).context(CompactRegionSnafu {
749                region_id: self.region_id,
750            }));
751        }
752
753        if let Some(pending_compaction) = self.pending_request {
754            pending_compaction
755                .waiter
756                .send(Err(err.clone()).context(CompactRegionSnafu {
757                    region_id: self.region_id,
758                }));
759        }
760
761        for pending_ddl in self.pending_ddl_requests {
762            pending_ddl
763                .sender
764                .send(Err(err.clone()).context(CompactRegionSnafu {
765                    region_id: self.region_id,
766                }));
767        }
768    }
769
770    /// Creates a new compaction request for compaction picker.
771    ///
772    /// It consumes all pending compaction waiters.
773    #[allow(clippy::too_many_arguments)]
774    fn new_compaction_request(
775        &mut self,
776        request_sender: Sender<WorkerRequestWithTime>,
777        mut waiter: OptionOutputTx,
778        engine_config: Arc<MitoConfig>,
779        cache_manager: CacheManagerRef,
780        manifest_ctx: &ManifestContextRef,
781        listener: WorkerListener,
782        schema_metadata_manager: SchemaMetadataManagerRef,
783        max_parallelism: usize,
784    ) -> CompactionRequest {
785        let current_version = CompactionVersion::from(self.version_control.current().version);
786        let start_time = Instant::now();
787        let mut waiters = Vec::with_capacity(self.waiters.len() + 1);
788        waiters.extend(std::mem::take(&mut self.waiters));
789
790        if let Some(waiter) = waiter.take_inner() {
791            waiters.push(waiter);
792        }
793
794        CompactionRequest {
795            engine_config,
796            current_version,
797            access_layer: self.access_layer.clone(),
798            request_sender: request_sender.clone(),
799            waiters,
800            start_time,
801            cache_manager,
802            manifest_ctx: manifest_ctx.clone(),
803            listener,
804            schema_metadata_manager,
805            max_parallelism,
806        }
807    }
808}
809
810#[derive(Debug, Clone)]
811pub struct CompactionOutput {
812    /// Compaction output file level.
813    pub output_level: Level,
814    /// Compaction input files.
815    pub inputs: Vec<FileHandle>,
816    /// Whether to remove deletion markers.
817    pub filter_deleted: bool,
818    /// Compaction output time range. Only windowed compaction specifies output time range.
819    pub output_time_range: Option<TimestampRange>,
820}
821
822/// SerializedCompactionOutput is a serialized version of [CompactionOutput] by replacing [FileHandle] with [FileMeta].
823#[derive(Debug, Clone, Serialize, Deserialize)]
824pub struct SerializedCompactionOutput {
825    output_level: Level,
826    inputs: Vec<FileMeta>,
827    filter_deleted: bool,
828    output_time_range: Option<TimestampRange>,
829}
830
831/// Builders to create [BoxedBatchReader] for compaction.
832struct CompactionSstReaderBuilder<'a> {
833    metadata: RegionMetadataRef,
834    sst_layer: AccessLayerRef,
835    cache: CacheManagerRef,
836    inputs: &'a [FileHandle],
837    append_mode: bool,
838    filter_deleted: bool,
839    time_range: Option<TimestampRange>,
840    merge_mode: MergeMode,
841}
842
843impl CompactionSstReaderBuilder<'_> {
844    /// Builds [BoxedBatchReader] that reads all SST files and yields batches in primary key order.
845    async fn build_sst_reader(self) -> Result<BoxedBatchReader> {
846        let scan_input = self.build_scan_input(false)?.with_compaction(true);
847
848        SeqScan::new(scan_input).build_reader_for_compaction().await
849    }
850
851    /// Builds [BoxedRecordBatchStream] that reads all SST files and yields batches in flat format for compaction.
852    async fn build_flat_sst_reader(self) -> Result<BoxedRecordBatchStream> {
853        let scan_input = self.build_scan_input(true)?.with_compaction(true);
854
855        SeqScan::new(scan_input)
856            .build_flat_reader_for_compaction()
857            .await
858    }
859
860    fn build_scan_input(self, flat_format: bool) -> Result<ScanInput> {
861        let mapper = ProjectionMapper::all(&self.metadata, flat_format)?;
862        let mut scan_input = ScanInput::new(self.sst_layer, mapper)
863            .with_files(self.inputs.to_vec())
864            .with_append_mode(self.append_mode)
865            // We use special cache strategy for compaction.
866            .with_cache(CacheStrategy::Compaction(self.cache))
867            .with_filter_deleted(self.filter_deleted)
868            // We ignore file not found error during compaction.
869            .with_ignore_file_not_found(true)
870            .with_merge_mode(self.merge_mode)
871            .with_flat_format(flat_format);
872
873        // This serves as a workaround of https://github.com/GreptimeTeam/greptimedb/issues/3944
874        // by converting time ranges into predicate.
875        if let Some(time_range) = self.time_range {
876            scan_input =
877                scan_input.with_predicate(time_range_to_predicate(time_range, &self.metadata)?);
878        }
879
880        Ok(scan_input)
881    }
882}
883
884/// Converts time range to predicates so that rows outside the range will be filtered.
885fn time_range_to_predicate(
886    range: TimestampRange,
887    metadata: &RegionMetadataRef,
888) -> Result<PredicateGroup> {
889    let ts_col = metadata.time_index_column();
890
891    // safety: time index column's type must be a valid timestamp type.
892    let ts_col_unit = ts_col
893        .column_schema
894        .data_type
895        .as_timestamp()
896        .unwrap()
897        .unit();
898
899    let exprs = match (range.start(), range.end()) {
900        (Some(start), Some(end)) => {
901            vec![
902                datafusion_expr::col(ts_col.column_schema.name.clone())
903                    .gt_eq(ts_to_lit(*start, ts_col_unit)?),
904                datafusion_expr::col(ts_col.column_schema.name.clone())
905                    .lt(ts_to_lit(*end, ts_col_unit)?),
906            ]
907        }
908        (Some(start), None) => {
909            vec![
910                datafusion_expr::col(ts_col.column_schema.name.clone())
911                    .gt_eq(ts_to_lit(*start, ts_col_unit)?),
912            ]
913        }
914
915        (None, Some(end)) => {
916            vec![
917                datafusion_expr::col(ts_col.column_schema.name.clone())
918                    .lt(ts_to_lit(*end, ts_col_unit)?),
919            ]
920        }
921        (None, None) => {
922            return Ok(PredicateGroup::default());
923        }
924    };
925
926    let predicate = PredicateGroup::new(metadata, &exprs)?;
927    Ok(predicate)
928}
929
930fn ts_to_lit(ts: Timestamp, ts_col_unit: TimeUnit) -> Result<Expr> {
931    let ts = ts
932        .convert_to(ts_col_unit)
933        .context(TimeRangePredicateOverflowSnafu {
934            timestamp: ts,
935            unit: ts_col_unit,
936        })?;
937    let val = ts.value();
938    let scalar_value = match ts_col_unit {
939        TimeUnit::Second => ScalarValue::TimestampSecond(Some(val), None),
940        TimeUnit::Millisecond => ScalarValue::TimestampMillisecond(Some(val), None),
941        TimeUnit::Microsecond => ScalarValue::TimestampMicrosecond(Some(val), None),
942        TimeUnit::Nanosecond => ScalarValue::TimestampNanosecond(Some(val), None),
943    };
944    Ok(datafusion_expr::lit(scalar_value))
945}
946
947/// Finds all expired SSTs across levels.
948fn get_expired_ssts(
949    levels: &[LevelMeta],
950    ttl: Option<TimeToLive>,
951    now: Timestamp,
952) -> Vec<FileHandle> {
953    let Some(ttl) = ttl else {
954        return vec![];
955    };
956
957    levels
958        .iter()
959        .flat_map(|l| l.get_expired_files(&now, &ttl).into_iter())
960        .collect()
961}
962
963/// Estimates compaction memory as the sum of all input files' maximum row-group
964/// uncompressed sizes.
965fn estimate_compaction_bytes(picker_output: &PickerOutput) -> u64 {
966    picker_output
967        .outputs
968        .iter()
969        .flat_map(|output| output.inputs.iter())
970        .map(|file: &FileHandle| {
971            let meta = file.meta_ref();
972            meta.max_row_group_uncompressed_size
973        })
974        .sum()
975}
976
977/// Pending compaction request that is supposed to run after current task is finished,
978/// typically used for manual compactions.
979struct PendingCompaction {
980    /// Compaction options. Currently, it can only be [StrictWindow].
981    pub(crate) options: compact_request::Options,
982    /// Waiters of pending requests.
983    pub(crate) waiter: OptionOutputTx,
984    /// Max parallelism for pending compaction.
985    pub(crate) max_parallelism: usize,
986}
987
988#[cfg(test)]
989mod tests {
990    use std::assert_matches::assert_matches;
991    use std::time::Duration;
992
993    use api::v1::region::StrictWindow;
994    use common_datasource::compression::CompressionType;
995    use common_meta::key::schema_name::SchemaNameValue;
996    use common_time::DatabaseTimeToLive;
997    use tokio::sync::{Barrier, oneshot};
998
999    use super::*;
1000    use crate::compaction::memory_manager::{CompactionMemoryGuard, new_compaction_memory_manager};
1001    use crate::error::InvalidSchedulerStateSnafu;
1002    use crate::manifest::manager::{RegionManifestManager, RegionManifestOptions};
1003    use crate::region::ManifestContext;
1004    use crate::schedule::scheduler::{Job, Scheduler};
1005    use crate::sst::FormatType;
1006    use crate::test_util::mock_schema_metadata_manager;
1007    use crate::test_util::scheduler_util::{SchedulerEnv, VecScheduler};
1008    use crate::test_util::version_util::{VersionControlBuilder, apply_edit};
1009
1010    struct FailingScheduler;
1011
1012    #[async_trait::async_trait]
1013    impl Scheduler for FailingScheduler {
1014        fn schedule(&self, _job: Job) -> Result<()> {
1015            InvalidSchedulerStateSnafu.fail()
1016        }
1017
1018        async fn stop(&self, _await_termination: bool) -> Result<()> {
1019            Ok(())
1020        }
1021    }
1022
1023    #[tokio::test]
1024    async fn test_find_compaction_options_db_level() {
1025        let env = SchedulerEnv::new().await;
1026        let builder = VersionControlBuilder::new();
1027        let (schema_metadata_manager, kv_backend) = mock_schema_metadata_manager();
1028        let region_id = builder.region_id();
1029        let table_id = region_id.table_id();
1030        // Register table without ttl but with db-level compaction options
1031        let mut schema_value = SchemaNameValue {
1032            ttl: Some(DatabaseTimeToLive::default()),
1033            ..Default::default()
1034        };
1035        schema_value
1036            .extra_options
1037            .insert("compaction.type".to_string(), "twcs".to_string());
1038        schema_value
1039            .extra_options
1040            .insert("compaction.twcs.time_window".to_string(), "2h".to_string());
1041        schema_metadata_manager
1042            .register_region_table_info(
1043                table_id,
1044                "t",
1045                "c",
1046                "s",
1047                Some(schema_value),
1048                kv_backend.clone(),
1049            )
1050            .await;
1051
1052        let version_control = Arc::new(builder.build());
1053        let region_opts = version_control.current().version.options.clone();
1054        let (opts, _) = find_dynamic_options(table_id, &region_opts, &schema_metadata_manager)
1055            .await
1056            .unwrap();
1057        match opts {
1058            crate::region::options::CompactionOptions::Twcs(t) => {
1059                assert_eq!(t.time_window_seconds(), Some(2 * 3600));
1060            }
1061        }
1062        let manifest_ctx = env
1063            .mock_manifest_context(version_control.current().version.metadata.clone())
1064            .await;
1065        let (tx, _rx) = mpsc::channel(4);
1066        let mut scheduler = env.mock_compaction_scheduler(tx);
1067        let (otx, _orx) = oneshot::channel();
1068        let request = scheduler
1069            .region_status
1070            .entry(region_id)
1071            .or_insert_with(|| {
1072                crate::compaction::CompactionStatus::new(
1073                    region_id,
1074                    version_control.clone(),
1075                    env.access_layer.clone(),
1076                )
1077            })
1078            .new_compaction_request(
1079                scheduler.request_sender.clone(),
1080                OptionOutputTx::new(Some(OutputTx::new(otx))),
1081                scheduler.engine_config.clone(),
1082                scheduler.cache_manager.clone(),
1083                &manifest_ctx,
1084                scheduler.listener.clone(),
1085                schema_metadata_manager.clone(),
1086                1,
1087            );
1088        scheduler
1089            .schedule_compaction_request(
1090                request,
1091                compact_request::Options::Regular(Default::default()),
1092            )
1093            .await
1094            .unwrap();
1095    }
1096
1097    #[tokio::test]
1098    async fn test_find_compaction_options_priority() {
1099        fn schema_value_with_twcs(time_window: &str) -> SchemaNameValue {
1100            let mut schema_value = SchemaNameValue {
1101                ttl: Some(DatabaseTimeToLive::default()),
1102                ..Default::default()
1103            };
1104            schema_value
1105                .extra_options
1106                .insert("compaction.type".to_string(), "twcs".to_string());
1107            schema_value.extra_options.insert(
1108                "compaction.twcs.time_window".to_string(),
1109                time_window.to_string(),
1110            );
1111            schema_value
1112        }
1113
1114        let cases = [
1115            (
1116                "db options set and table override set",
1117                Some(schema_value_with_twcs("2h")),
1118                true,
1119                Some(Duration::from_secs(5 * 3600)),
1120                Some(5 * 3600),
1121            ),
1122            (
1123                "db options set and table override not set",
1124                Some(schema_value_with_twcs("2h")),
1125                false,
1126                None,
1127                Some(2 * 3600),
1128            ),
1129            (
1130                "db options not set and table override set",
1131                None,
1132                true,
1133                Some(Duration::from_secs(4 * 3600)),
1134                Some(4 * 3600),
1135            ),
1136            (
1137                "db options not set and table override not set",
1138                None,
1139                false,
1140                None,
1141                None,
1142            ),
1143        ];
1144
1145        for (case_name, schema_value, override_set, table_window, expected_window) in cases {
1146            let builder = VersionControlBuilder::new();
1147            let (schema_metadata_manager, kv_backend) = mock_schema_metadata_manager();
1148            let table_id = builder.region_id().table_id();
1149            schema_metadata_manager
1150                .register_region_table_info(
1151                    table_id,
1152                    "t",
1153                    "c",
1154                    "s",
1155                    schema_value,
1156                    kv_backend.clone(),
1157                )
1158                .await;
1159
1160            let version_control = Arc::new(builder.build());
1161            let mut region_opts = version_control.current().version.options.clone();
1162            region_opts.compaction_override = override_set;
1163            if let Some(window) = table_window {
1164                let crate::region::options::CompactionOptions::Twcs(twcs) =
1165                    &mut region_opts.compaction;
1166                twcs.time_window = Some(window);
1167            }
1168
1169            let (opts, _) = find_dynamic_options(table_id, &region_opts, &schema_metadata_manager)
1170                .await
1171                .unwrap();
1172            match opts {
1173                crate::region::options::CompactionOptions::Twcs(t) => {
1174                    assert_eq!(t.time_window_seconds(), expected_window, "{case_name}");
1175                }
1176            }
1177        }
1178    }
1179
1180    #[tokio::test]
1181    async fn test_schedule_empty() {
1182        let env = SchedulerEnv::new().await;
1183        let (tx, _rx) = mpsc::channel(4);
1184        let mut scheduler = env.mock_compaction_scheduler(tx);
1185        let mut builder = VersionControlBuilder::new();
1186        let (schema_metadata_manager, kv_backend) = mock_schema_metadata_manager();
1187        schema_metadata_manager
1188            .register_region_table_info(
1189                builder.region_id().table_id(),
1190                "test_table",
1191                "test_catalog",
1192                "test_schema",
1193                None,
1194                kv_backend,
1195            )
1196            .await;
1197        // Nothing to compact.
1198        let version_control = Arc::new(builder.build());
1199        let (output_tx, output_rx) = oneshot::channel();
1200        let waiter = OptionOutputTx::from(output_tx);
1201        let manifest_ctx = env
1202            .mock_manifest_context(version_control.current().version.metadata.clone())
1203            .await;
1204        scheduler
1205            .schedule_compaction(
1206                builder.region_id(),
1207                compact_request::Options::Regular(Default::default()),
1208                &version_control,
1209                &env.access_layer,
1210                waiter,
1211                &manifest_ctx,
1212                schema_metadata_manager.clone(),
1213                1,
1214            )
1215            .await
1216            .unwrap();
1217        let output = output_rx.await.unwrap().unwrap();
1218        assert_eq!(output, 0);
1219        assert!(scheduler.region_status.is_empty());
1220
1221        // Only one file, picker won't compact it.
1222        let version_control = Arc::new(builder.push_l0_file(0, 1000).build());
1223        let (output_tx, output_rx) = oneshot::channel();
1224        let waiter = OptionOutputTx::from(output_tx);
1225        scheduler
1226            .schedule_compaction(
1227                builder.region_id(),
1228                compact_request::Options::Regular(Default::default()),
1229                &version_control,
1230                &env.access_layer,
1231                waiter,
1232                &manifest_ctx,
1233                schema_metadata_manager,
1234                1,
1235            )
1236            .await
1237            .unwrap();
1238        let output = output_rx.await.unwrap().unwrap();
1239        assert_eq!(output, 0);
1240        assert!(scheduler.region_status.is_empty());
1241    }
1242
1243    #[tokio::test]
1244    async fn test_schedule_on_finished() {
1245        common_telemetry::init_default_ut_logging();
1246        let job_scheduler = Arc::new(VecScheduler::default());
1247        let env = SchedulerEnv::new().await.scheduler(job_scheduler.clone());
1248        let (tx, _rx) = mpsc::channel(4);
1249        let mut scheduler = env.mock_compaction_scheduler(tx);
1250        let mut builder = VersionControlBuilder::new();
1251        let purger = builder.file_purger();
1252        let region_id = builder.region_id();
1253
1254        let (schema_metadata_manager, kv_backend) = mock_schema_metadata_manager();
1255        schema_metadata_manager
1256            .register_region_table_info(
1257                builder.region_id().table_id(),
1258                "test_table",
1259                "test_catalog",
1260                "test_schema",
1261                None,
1262                kv_backend,
1263            )
1264            .await;
1265
1266        // 5 files to compact.
1267        let end = 1000 * 1000;
1268        let version_control = Arc::new(
1269            builder
1270                .push_l0_file(0, end)
1271                .push_l0_file(10, end)
1272                .push_l0_file(50, end)
1273                .push_l0_file(80, end)
1274                .push_l0_file(90, end)
1275                .build(),
1276        );
1277        let manifest_ctx = env
1278            .mock_manifest_context(version_control.current().version.metadata.clone())
1279            .await;
1280        scheduler
1281            .schedule_compaction(
1282                region_id,
1283                compact_request::Options::Regular(Default::default()),
1284                &version_control,
1285                &env.access_layer,
1286                OptionOutputTx::none(),
1287                &manifest_ctx,
1288                schema_metadata_manager.clone(),
1289                1,
1290            )
1291            .await
1292            .unwrap();
1293        // Should schedule 1 compaction.
1294        assert_eq!(1, scheduler.region_status.len());
1295        assert_eq!(1, job_scheduler.num_jobs());
1296        let data = version_control.current();
1297        let file_metas: Vec<_> = data.version.ssts.levels()[0]
1298            .files
1299            .values()
1300            .map(|file| file.meta_ref().clone())
1301            .collect();
1302
1303        // 5 files for next compaction and removes old files.
1304        apply_edit(
1305            &version_control,
1306            &[(0, end), (20, end), (40, end), (60, end), (80, end)],
1307            &file_metas,
1308            purger.clone(),
1309        );
1310        // The task is pending.
1311        let (tx, _rx) = oneshot::channel();
1312        scheduler
1313            .schedule_compaction(
1314                region_id,
1315                compact_request::Options::Regular(Default::default()),
1316                &version_control,
1317                &env.access_layer,
1318                OptionOutputTx::new(Some(OutputTx::new(tx))),
1319                &manifest_ctx,
1320                schema_metadata_manager.clone(),
1321                1,
1322            )
1323            .await
1324            .unwrap();
1325        assert_eq!(1, scheduler.region_status.len());
1326        assert_eq!(1, job_scheduler.num_jobs());
1327        assert!(
1328            !scheduler
1329                .region_status
1330                .get(&builder.region_id())
1331                .unwrap()
1332                .waiters
1333                .is_empty()
1334        );
1335
1336        // On compaction finished and schedule next compaction.
1337        scheduler
1338            .on_compaction_finished(region_id, &manifest_ctx, schema_metadata_manager.clone())
1339            .await;
1340        assert_eq!(1, scheduler.region_status.len());
1341        assert_eq!(2, job_scheduler.num_jobs());
1342
1343        // 5 files for next compaction.
1344        apply_edit(
1345            &version_control,
1346            &[(0, end), (20, end), (40, end), (60, end), (80, end)],
1347            &[],
1348            purger.clone(),
1349        );
1350        let (tx, _rx) = oneshot::channel();
1351        // The task is pending.
1352        scheduler
1353            .schedule_compaction(
1354                region_id,
1355                compact_request::Options::Regular(Default::default()),
1356                &version_control,
1357                &env.access_layer,
1358                OptionOutputTx::new(Some(OutputTx::new(tx))),
1359                &manifest_ctx,
1360                schema_metadata_manager,
1361                1,
1362            )
1363            .await
1364            .unwrap();
1365        assert_eq!(2, job_scheduler.num_jobs());
1366        assert!(
1367            !scheduler
1368                .region_status
1369                .get(&builder.region_id())
1370                .unwrap()
1371                .waiters
1372                .is_empty()
1373        );
1374    }
1375
1376    #[tokio::test]
1377    async fn test_manual_compaction_when_compaction_in_progress() {
1378        common_telemetry::init_default_ut_logging();
1379        let job_scheduler = Arc::new(VecScheduler::default());
1380        let env = SchedulerEnv::new().await.scheduler(job_scheduler.clone());
1381        let (tx, _rx) = mpsc::channel(4);
1382        let mut scheduler = env.mock_compaction_scheduler(tx);
1383        let mut builder = VersionControlBuilder::new();
1384        let purger = builder.file_purger();
1385        let region_id = builder.region_id();
1386
1387        let (schema_metadata_manager, kv_backend) = mock_schema_metadata_manager();
1388        schema_metadata_manager
1389            .register_region_table_info(
1390                builder.region_id().table_id(),
1391                "test_table",
1392                "test_catalog",
1393                "test_schema",
1394                None,
1395                kv_backend,
1396            )
1397            .await;
1398
1399        // 5 files to compact.
1400        let end = 1000 * 1000;
1401        let version_control = Arc::new(
1402            builder
1403                .push_l0_file(0, end)
1404                .push_l0_file(10, end)
1405                .push_l0_file(50, end)
1406                .push_l0_file(80, end)
1407                .push_l0_file(90, end)
1408                .build(),
1409        );
1410        let manifest_ctx = env
1411            .mock_manifest_context(version_control.current().version.metadata.clone())
1412            .await;
1413
1414        let file_metas: Vec<_> = version_control.current().version.ssts.levels()[0]
1415            .files
1416            .values()
1417            .map(|file| file.meta_ref().clone())
1418            .collect();
1419
1420        // 5 files for next compaction and removes old files.
1421        apply_edit(
1422            &version_control,
1423            &[(0, end), (20, end), (40, end), (60, end), (80, end)],
1424            &file_metas,
1425            purger.clone(),
1426        );
1427
1428        scheduler
1429            .schedule_compaction(
1430                region_id,
1431                compact_request::Options::Regular(Default::default()),
1432                &version_control,
1433                &env.access_layer,
1434                OptionOutputTx::none(),
1435                &manifest_ctx,
1436                schema_metadata_manager.clone(),
1437                1,
1438            )
1439            .await
1440            .unwrap();
1441        // Should schedule 1 compaction.
1442        assert_eq!(1, scheduler.region_status.len());
1443        assert_eq!(1, job_scheduler.num_jobs());
1444        assert!(
1445            scheduler
1446                .region_status
1447                .get(&region_id)
1448                .unwrap()
1449                .pending_request
1450                .is_none()
1451        );
1452
1453        // Schedule another manual compaction.
1454        let (tx, _rx) = oneshot::channel();
1455        scheduler
1456            .schedule_compaction(
1457                region_id,
1458                compact_request::Options::StrictWindow(StrictWindow { window_seconds: 60 }),
1459                &version_control,
1460                &env.access_layer,
1461                OptionOutputTx::new(Some(OutputTx::new(tx))),
1462                &manifest_ctx,
1463                schema_metadata_manager.clone(),
1464                1,
1465            )
1466            .await
1467            .unwrap();
1468        assert_eq!(1, scheduler.region_status.len());
1469        // Current job num should be 1 since compaction is in progress.
1470        assert_eq!(1, job_scheduler.num_jobs());
1471        let status = scheduler.region_status.get(&builder.region_id()).unwrap();
1472        assert!(status.pending_request.is_some());
1473
1474        // On compaction finished and schedule next compaction.
1475        scheduler
1476            .on_compaction_finished(region_id, &manifest_ctx, schema_metadata_manager.clone())
1477            .await;
1478        assert_eq!(1, scheduler.region_status.len());
1479        assert_eq!(2, job_scheduler.num_jobs());
1480
1481        let status = scheduler.region_status.get(&builder.region_id()).unwrap();
1482        assert!(status.pending_request.is_none());
1483    }
1484
1485    #[tokio::test]
1486    async fn test_compaction_bypass_in_staging_mode() {
1487        let env = SchedulerEnv::new().await;
1488        let (tx, _rx) = mpsc::channel(4);
1489        let mut scheduler = env.mock_compaction_scheduler(tx);
1490
1491        // Create version control and manifest context for staging mode
1492        let builder = VersionControlBuilder::new();
1493        let version_control = Arc::new(builder.build());
1494        let region_id = version_control.current().version.metadata.region_id;
1495
1496        // Create staging manifest context using the same pattern as SchedulerEnv
1497        let staging_manifest_ctx = {
1498            let manager = RegionManifestManager::new(
1499                version_control.current().version.metadata.clone(),
1500                0,
1501                RegionManifestOptions {
1502                    manifest_dir: "".to_string(),
1503                    object_store: env.access_layer.object_store().clone(),
1504                    compress_type: CompressionType::Uncompressed,
1505                    checkpoint_distance: 10,
1506                    remove_file_options: Default::default(),
1507                    manifest_cache: None,
1508                },
1509                FormatType::PrimaryKey,
1510                &Default::default(),
1511            )
1512            .await
1513            .unwrap();
1514            Arc::new(ManifestContext::new(
1515                manager,
1516                RegionRoleState::Leader(RegionLeaderState::Staging),
1517            ))
1518        };
1519
1520        let (schema_metadata_manager, _kv_backend) = mock_schema_metadata_manager();
1521
1522        // Test regular compaction bypass in staging mode
1523        let (tx, rx) = oneshot::channel();
1524        scheduler
1525            .schedule_compaction(
1526                region_id,
1527                compact_request::Options::Regular(Default::default()),
1528                &version_control,
1529                &env.access_layer,
1530                OptionOutputTx::new(Some(OutputTx::new(tx))),
1531                &staging_manifest_ctx,
1532                schema_metadata_manager,
1533                1,
1534            )
1535            .await
1536            .unwrap();
1537
1538        let result = rx.await.unwrap();
1539        assert_eq!(result.unwrap(), 0); // is there a better way to check this?
1540        assert_eq!(0, scheduler.region_status.len());
1541    }
1542
1543    #[tokio::test]
1544    async fn test_add_ddl_request_to_pending() {
1545        let env = SchedulerEnv::new().await;
1546        let (tx, _rx) = mpsc::channel(4);
1547        let mut scheduler = env.mock_compaction_scheduler(tx);
1548        let builder = VersionControlBuilder::new();
1549        let version_control = Arc::new(builder.build());
1550        let region_id = builder.region_id();
1551
1552        scheduler.region_status.insert(
1553            region_id,
1554            CompactionStatus::new(region_id, version_control, env.access_layer.clone()),
1555        );
1556
1557        let (output_tx, _output_rx) = oneshot::channel();
1558        scheduler.add_ddl_request_to_pending(SenderDdlRequest {
1559            region_id,
1560            sender: OptionOutputTx::from(output_tx),
1561            request: crate::request::DdlRequest::EnterStaging(
1562                store_api::region_request::EnterStagingRequest {
1563                    partition_directive:
1564                        store_api::region_request::StagingPartitionDirective::RejectAllWrites,
1565                },
1566            ),
1567        });
1568
1569        assert!(scheduler.has_pending_ddls(region_id));
1570    }
1571
1572    #[tokio::test]
1573    async fn test_pending_ddl_request_failed_on_compaction_failed() {
1574        let env = SchedulerEnv::new().await;
1575        let (tx, _rx) = mpsc::channel(4);
1576        let mut scheduler = env.mock_compaction_scheduler(tx);
1577        let builder = VersionControlBuilder::new();
1578        let version_control = Arc::new(builder.build());
1579        let region_id = builder.region_id();
1580
1581        scheduler.region_status.insert(
1582            region_id,
1583            CompactionStatus::new(region_id, version_control, env.access_layer.clone()),
1584        );
1585
1586        let (output_tx, output_rx) = oneshot::channel();
1587        scheduler.add_ddl_request_to_pending(SenderDdlRequest {
1588            region_id,
1589            sender: OptionOutputTx::from(output_tx),
1590            request: crate::request::DdlRequest::EnterStaging(
1591                store_api::region_request::EnterStagingRequest {
1592                    partition_directive:
1593                        store_api::region_request::StagingPartitionDirective::RejectAllWrites,
1594                },
1595            ),
1596        });
1597
1598        assert!(scheduler.has_pending_ddls(region_id));
1599        scheduler
1600            .on_compaction_failed(region_id, Arc::new(RegionClosedSnafu { region_id }.build()));
1601
1602        assert!(!scheduler.has_pending_ddls(region_id));
1603        let result = output_rx.await.unwrap();
1604        assert_matches!(result, Err(_));
1605    }
1606
1607    #[tokio::test]
1608    async fn test_pending_ddl_request_failed_on_region_closed() {
1609        let env = SchedulerEnv::new().await;
1610        let (tx, _rx) = mpsc::channel(4);
1611        let mut scheduler = env.mock_compaction_scheduler(tx);
1612        let builder = VersionControlBuilder::new();
1613        let version_control = Arc::new(builder.build());
1614        let region_id = builder.region_id();
1615
1616        scheduler.region_status.insert(
1617            region_id,
1618            CompactionStatus::new(region_id, version_control, env.access_layer.clone()),
1619        );
1620
1621        let (output_tx, output_rx) = oneshot::channel();
1622        scheduler.add_ddl_request_to_pending(SenderDdlRequest {
1623            region_id,
1624            sender: OptionOutputTx::from(output_tx),
1625            request: crate::request::DdlRequest::EnterStaging(
1626                store_api::region_request::EnterStagingRequest {
1627                    partition_directive:
1628                        store_api::region_request::StagingPartitionDirective::RejectAllWrites,
1629                },
1630            ),
1631        });
1632
1633        assert!(scheduler.has_pending_ddls(region_id));
1634        scheduler.on_region_closed(region_id);
1635
1636        assert!(!scheduler.has_pending_ddls(region_id));
1637        let result = output_rx.await.unwrap();
1638        assert_matches!(result, Err(_));
1639    }
1640
1641    #[tokio::test]
1642    async fn test_pending_ddl_request_failed_on_region_dropped() {
1643        let env = SchedulerEnv::new().await;
1644        let (tx, _rx) = mpsc::channel(4);
1645        let mut scheduler = env.mock_compaction_scheduler(tx);
1646        let builder = VersionControlBuilder::new();
1647        let version_control = Arc::new(builder.build());
1648        let region_id = builder.region_id();
1649
1650        scheduler.region_status.insert(
1651            region_id,
1652            CompactionStatus::new(region_id, version_control, env.access_layer.clone()),
1653        );
1654
1655        let (output_tx, output_rx) = oneshot::channel();
1656        scheduler.add_ddl_request_to_pending(SenderDdlRequest {
1657            region_id,
1658            sender: OptionOutputTx::from(output_tx),
1659            request: crate::request::DdlRequest::EnterStaging(
1660                store_api::region_request::EnterStagingRequest {
1661                    partition_directive:
1662                        store_api::region_request::StagingPartitionDirective::RejectAllWrites,
1663                },
1664            ),
1665        });
1666
1667        assert!(scheduler.has_pending_ddls(region_id));
1668        scheduler.on_region_dropped(region_id);
1669
1670        assert!(!scheduler.has_pending_ddls(region_id));
1671        let result = output_rx.await.unwrap();
1672        assert_matches!(result, Err(_));
1673    }
1674
1675    #[tokio::test]
1676    async fn test_pending_ddl_request_failed_on_region_truncated() {
1677        let env = SchedulerEnv::new().await;
1678        let (tx, _rx) = mpsc::channel(4);
1679        let mut scheduler = env.mock_compaction_scheduler(tx);
1680        let builder = VersionControlBuilder::new();
1681        let version_control = Arc::new(builder.build());
1682        let region_id = builder.region_id();
1683
1684        scheduler.region_status.insert(
1685            region_id,
1686            CompactionStatus::new(region_id, version_control, env.access_layer.clone()),
1687        );
1688
1689        let (output_tx, output_rx) = oneshot::channel();
1690        scheduler.add_ddl_request_to_pending(SenderDdlRequest {
1691            region_id,
1692            sender: OptionOutputTx::from(output_tx),
1693            request: crate::request::DdlRequest::EnterStaging(
1694                store_api::region_request::EnterStagingRequest {
1695                    partition_directive:
1696                        store_api::region_request::StagingPartitionDirective::RejectAllWrites,
1697                },
1698            ),
1699        });
1700
1701        assert!(scheduler.has_pending_ddls(region_id));
1702        scheduler.on_region_truncated(region_id);
1703
1704        assert!(!scheduler.has_pending_ddls(region_id));
1705        let result = output_rx.await.unwrap();
1706        assert_matches!(result, Err(_));
1707    }
1708
1709    #[tokio::test]
1710    async fn test_on_compaction_finished_returns_pending_ddl_requests() {
1711        let job_scheduler = Arc::new(VecScheduler::default());
1712        let env = SchedulerEnv::new().await.scheduler(job_scheduler.clone());
1713        let (tx, _rx) = mpsc::channel(4);
1714        let mut scheduler = env.mock_compaction_scheduler(tx);
1715        let builder = VersionControlBuilder::new();
1716        let version_control = Arc::new(builder.build());
1717        let region_id = builder.region_id();
1718        let manifest_ctx = env
1719            .mock_manifest_context(version_control.current().version.metadata.clone())
1720            .await;
1721        let (schema_metadata_manager, _kv_backend) = mock_schema_metadata_manager();
1722
1723        scheduler.region_status.insert(
1724            region_id,
1725            CompactionStatus::new(region_id, version_control, env.access_layer.clone()),
1726        );
1727
1728        let (output_tx, _output_rx) = oneshot::channel();
1729        scheduler.add_ddl_request_to_pending(SenderDdlRequest {
1730            region_id,
1731            sender: OptionOutputTx::from(output_tx),
1732            request: crate::request::DdlRequest::EnterStaging(
1733                store_api::region_request::EnterStagingRequest {
1734                    partition_directive:
1735                        store_api::region_request::StagingPartitionDirective::RejectAllWrites,
1736                },
1737            ),
1738        });
1739
1740        let pending_ddls = scheduler
1741            .on_compaction_finished(region_id, &manifest_ctx, schema_metadata_manager)
1742            .await;
1743
1744        assert_eq!(pending_ddls.len(), 1);
1745        assert!(!scheduler.has_pending_ddls(region_id));
1746        assert!(!scheduler.region_status.contains_key(&region_id));
1747        assert_eq!(job_scheduler.num_jobs(), 0);
1748    }
1749
1750    #[tokio::test]
1751    async fn test_on_compaction_finished_replays_pending_ddl_after_manual_noop() {
1752        let env = SchedulerEnv::new().await;
1753        let (tx, _rx) = mpsc::channel(4);
1754        let mut scheduler = env.mock_compaction_scheduler(tx);
1755        let builder = VersionControlBuilder::new();
1756        let version_control = Arc::new(builder.build());
1757        let region_id = builder.region_id();
1758        let manifest_ctx = env
1759            .mock_manifest_context(version_control.current().version.metadata.clone())
1760            .await;
1761        let (schema_metadata_manager, _kv_backend) = mock_schema_metadata_manager();
1762
1763        let (manual_tx, manual_rx) = oneshot::channel();
1764        let mut status =
1765            CompactionStatus::new(region_id, version_control.clone(), env.access_layer.clone());
1766        status.set_pending_request(PendingCompaction {
1767            options: compact_request::Options::Regular(Default::default()),
1768            waiter: OptionOutputTx::from(manual_tx),
1769            max_parallelism: 1,
1770        });
1771        scheduler.region_status.insert(region_id, status);
1772
1773        let (ddl_tx, _ddl_rx) = oneshot::channel();
1774        scheduler.add_ddl_request_to_pending(SenderDdlRequest {
1775            region_id,
1776            sender: OptionOutputTx::from(ddl_tx),
1777            request: crate::request::DdlRequest::EnterStaging(
1778                store_api::region_request::EnterStagingRequest {
1779                    partition_directive:
1780                        store_api::region_request::StagingPartitionDirective::RejectAllWrites,
1781                },
1782            ),
1783        });
1784
1785        let pending_ddls = scheduler
1786            .on_compaction_finished(region_id, &manifest_ctx, schema_metadata_manager)
1787            .await;
1788
1789        assert_eq!(pending_ddls.len(), 1);
1790        assert!(!scheduler.region_status.contains_key(&region_id));
1791        assert_eq!(manual_rx.await.unwrap().unwrap(), 0);
1792    }
1793
1794    #[tokio::test]
1795    async fn test_on_compaction_finished_returns_empty_when_region_absent() {
1796        let env = SchedulerEnv::new().await;
1797        let (tx, _rx) = mpsc::channel(4);
1798        let mut scheduler = env.mock_compaction_scheduler(tx);
1799        let builder = VersionControlBuilder::new();
1800        let region_id = builder.region_id();
1801        let version_control = Arc::new(builder.build());
1802        let manifest_ctx = env
1803            .mock_manifest_context(version_control.current().version.metadata.clone())
1804            .await;
1805        let (schema_metadata_manager, _kv_backend) = mock_schema_metadata_manager();
1806
1807        let pending_ddls = scheduler
1808            .on_compaction_finished(region_id, &manifest_ctx, schema_metadata_manager)
1809            .await;
1810
1811        assert!(pending_ddls.is_empty());
1812    }
1813
1814    #[tokio::test]
1815    async fn test_on_compaction_finished_manual_schedule_error_cleans_status() {
1816        let env = SchedulerEnv::new()
1817            .await
1818            .scheduler(Arc::new(FailingScheduler));
1819        let (tx, _rx) = mpsc::channel(4);
1820        let mut scheduler = env.mock_compaction_scheduler(tx);
1821        let mut builder = VersionControlBuilder::new();
1822        let end = 1000 * 1000;
1823        let version_control = Arc::new(
1824            builder
1825                .push_l0_file(0, end)
1826                .push_l0_file(10, end)
1827                .push_l0_file(50, end)
1828                .push_l0_file(80, end)
1829                .push_l0_file(90, end)
1830                .build(),
1831        );
1832        let region_id = builder.region_id();
1833        let manifest_ctx = env
1834            .mock_manifest_context(version_control.current().version.metadata.clone())
1835            .await;
1836        let (schema_metadata_manager, _kv_backend) = mock_schema_metadata_manager();
1837
1838        let (manual_tx, manual_rx) = oneshot::channel();
1839        let mut status =
1840            CompactionStatus::new(region_id, version_control.clone(), env.access_layer.clone());
1841        status.set_pending_request(PendingCompaction {
1842            options: compact_request::Options::Regular(Default::default()),
1843            waiter: OptionOutputTx::from(manual_tx),
1844            max_parallelism: 1,
1845        });
1846        scheduler.region_status.insert(region_id, status);
1847
1848        let (ddl_tx, ddl_rx) = oneshot::channel();
1849        scheduler.add_ddl_request_to_pending(SenderDdlRequest {
1850            region_id,
1851            sender: OptionOutputTx::from(ddl_tx),
1852            request: crate::request::DdlRequest::EnterStaging(
1853                store_api::region_request::EnterStagingRequest {
1854                    partition_directive:
1855                        store_api::region_request::StagingPartitionDirective::RejectAllWrites,
1856                },
1857            ),
1858        });
1859
1860        let pending_ddls = scheduler
1861            .on_compaction_finished(region_id, &manifest_ctx, schema_metadata_manager)
1862            .await;
1863
1864        assert!(pending_ddls.is_empty());
1865        assert!(!scheduler.region_status.contains_key(&region_id));
1866        assert!(manual_rx.await.is_err());
1867        assert_matches!(ddl_rx.await.unwrap(), Err(_));
1868    }
1869
1870    #[tokio::test]
1871    async fn test_on_compaction_finished_next_schedule_noop_removes_status() {
1872        let env = SchedulerEnv::new().await;
1873        let (tx, _rx) = mpsc::channel(4);
1874        let mut scheduler = env.mock_compaction_scheduler(tx);
1875        let builder = VersionControlBuilder::new();
1876        let version_control = Arc::new(builder.build());
1877        let region_id = builder.region_id();
1878        let manifest_ctx = env
1879            .mock_manifest_context(version_control.current().version.metadata.clone())
1880            .await;
1881        let (schema_metadata_manager, _kv_backend) = mock_schema_metadata_manager();
1882
1883        scheduler.region_status.insert(
1884            region_id,
1885            CompactionStatus::new(region_id, version_control, env.access_layer.clone()),
1886        );
1887
1888        let pending_ddls = scheduler
1889            .on_compaction_finished(region_id, &manifest_ctx, schema_metadata_manager)
1890            .await;
1891
1892        assert!(pending_ddls.is_empty());
1893        assert!(!scheduler.region_status.contains_key(&region_id));
1894    }
1895
1896    #[tokio::test]
1897    async fn test_on_compaction_finished_next_schedule_error_cleans_status() {
1898        let env = SchedulerEnv::new()
1899            .await
1900            .scheduler(Arc::new(FailingScheduler));
1901        let (tx, _rx) = mpsc::channel(4);
1902        let mut scheduler = env.mock_compaction_scheduler(tx);
1903        let mut builder = VersionControlBuilder::new();
1904        let end = 1000 * 1000;
1905        let version_control = Arc::new(
1906            builder
1907                .push_l0_file(0, end)
1908                .push_l0_file(10, end)
1909                .push_l0_file(50, end)
1910                .push_l0_file(80, end)
1911                .push_l0_file(90, end)
1912                .build(),
1913        );
1914        let region_id = builder.region_id();
1915        let manifest_ctx = env
1916            .mock_manifest_context(version_control.current().version.metadata.clone())
1917            .await;
1918        let (schema_metadata_manager, _kv_backend) = mock_schema_metadata_manager();
1919
1920        scheduler.region_status.insert(
1921            region_id,
1922            CompactionStatus::new(region_id, version_control, env.access_layer.clone()),
1923        );
1924
1925        let pending_ddls = scheduler
1926            .on_compaction_finished(region_id, &manifest_ctx, schema_metadata_manager)
1927            .await;
1928
1929        assert!(pending_ddls.is_empty());
1930        assert!(!scheduler.region_status.contains_key(&region_id));
1931    }
1932
1933    #[tokio::test]
1934    async fn test_concurrent_memory_competition() {
1935        let manager = Arc::new(new_compaction_memory_manager(3 * 1024 * 1024)); // 3MB
1936        let barrier = Arc::new(Barrier::new(3));
1937        let mut handles = vec![];
1938
1939        // Spawn 3 tasks competing for memory, each trying to acquire 2MB
1940        for _i in 0..3 {
1941            let mgr = manager.clone();
1942            let bar = barrier.clone();
1943            let handle = tokio::spawn(async move {
1944                bar.wait().await; // Synchronize start
1945                mgr.try_acquire(2 * 1024 * 1024)
1946            });
1947            handles.push(handle);
1948        }
1949
1950        let results: Vec<Option<CompactionMemoryGuard>> = futures::future::join_all(handles)
1951            .await
1952            .into_iter()
1953            .map(|r| r.unwrap())
1954            .collect();
1955
1956        // Only 1 should succeed (3MB limit, 2MB request, can only fit one)
1957        let succeeded = results.iter().filter(|r| r.is_some()).count();
1958        let failed = results.iter().filter(|r| r.is_none()).count();
1959
1960        assert_eq!(succeeded, 1, "Expected exactly 1 task to acquire memory");
1961        assert_eq!(failed, 2, "Expected 2 tasks to fail");
1962
1963        // Clean up
1964        drop(results);
1965        assert_eq!(manager.used_bytes(), 0);
1966    }
1967}