Skip to main content

mito2/
compaction.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod buckets;
16pub mod compactor;
17pub mod memory_manager;
18pub mod picker;
19pub mod run;
20mod task;
21#[cfg(test)]
22mod test_util;
23mod twcs;
24mod window;
25
26use std::collections::HashMap;
27use std::sync::{Arc, Mutex};
28use std::time::Instant;
29
30use api::v1::region::compact_request;
31use api::v1::region::compact_request::Options;
32use common_base::Plugins;
33use common_base::cancellation::CancellationHandle;
34use common_memory_manager::OnExhaustedPolicy;
35use common_meta::key::SchemaMetadataManagerRef;
36use common_telemetry::{debug, error, info, warn};
37use common_time::range::TimestampRange;
38use common_time::timestamp::TimeUnit;
39use common_time::{TimeToLive, Timestamp};
40use datafusion_common::ScalarValue;
41use datafusion_expr::Expr;
42use serde::{Deserialize, Serialize};
43use snafu::{OptionExt, ResultExt};
44use store_api::metadata::RegionMetadataRef;
45use store_api::storage::RegionId;
46use task::MAX_PARALLEL_COMPACTION;
47use tokio::sync::mpsc::{self, Sender};
48
49use crate::access_layer::AccessLayerRef;
50use crate::cache::{CacheManagerRef, CacheStrategy};
51use crate::compaction::compactor::{CompactionRegion, CompactionVersion, DefaultCompactor};
52use crate::compaction::memory_manager::CompactionMemoryManager;
53use crate::compaction::picker::{CompactionTask, PickerOutput, new_picker};
54use crate::compaction::task::CompactionTaskImpl;
55use crate::config::MitoConfig;
56use crate::error::{
57    CompactRegionSnafu, CompactionCancelledSnafu, Error, GetSchemaMetadataSnafu,
58    ManualCompactionOverrideSnafu, RegionClosedSnafu, RegionDroppedSnafu, RegionTruncatedSnafu,
59    RemoteCompactionSnafu, Result, TimeRangePredicateOverflowSnafu, TimeoutSnafu,
60};
61use crate::metrics::{COMPACTION_STAGE_ELAPSED, INFLIGHT_COMPACTION_COUNT};
62use crate::read::FlatSource;
63use crate::read::flat_projection::FlatProjectionMapper;
64use crate::read::scan_region::{PredicateGroup, ScanInput};
65use crate::read::seq_scan::SeqScan;
66use crate::region::options::{MergeMode, RegionOptions};
67use crate::region::version::VersionControlRef;
68use crate::region::{ManifestContextRef, RegionLeaderState, RegionRoleState};
69use crate::request::{OptionOutputTx, OutputTx, SenderDdlRequest, WorkerRequestWithTime};
70use crate::schedule::remote_job_scheduler::{
71    CompactionJob, DefaultNotifier, RemoteJob, RemoteJobSchedulerRef,
72};
73use crate::schedule::scheduler::SchedulerRef;
74use crate::sst::file::{FileHandle, FileMeta, Level};
75use crate::sst::version::LevelMeta;
76use crate::worker::WorkerListener;
77
78/// Region compaction request.
79pub struct CompactionRequest {
80    pub(crate) engine_config: Arc<MitoConfig>,
81    pub(crate) current_version: CompactionVersion,
82    pub(crate) access_layer: AccessLayerRef,
83    /// Sender to send notification to the region worker.
84    pub(crate) request_sender: mpsc::Sender<WorkerRequestWithTime>,
85    /// Waiters of the compaction request.
86    pub(crate) waiters: Vec<OutputTx>,
87    /// Start time of compaction task.
88    pub(crate) start_time: Instant,
89    pub(crate) cache_manager: CacheManagerRef,
90    pub(crate) manifest_ctx: ManifestContextRef,
91    pub(crate) listener: WorkerListener,
92    pub(crate) schema_metadata_manager: SchemaMetadataManagerRef,
93    pub(crate) max_parallelism: usize,
94}
95
96impl CompactionRequest {
97    pub(crate) fn region_id(&self) -> RegionId {
98        self.current_version.metadata.region_id
99    }
100}
101
102/// Compaction scheduler tracks and manages compaction tasks.
103pub(crate) struct CompactionScheduler {
104    scheduler: SchedulerRef,
105    /// Compacting regions.
106    region_status: HashMap<RegionId, CompactionStatus>,
107    /// Request sender of the worker that this scheduler belongs to.
108    request_sender: Sender<WorkerRequestWithTime>,
109    cache_manager: CacheManagerRef,
110    engine_config: Arc<MitoConfig>,
111    memory_manager: Arc<CompactionMemoryManager>,
112    memory_policy: OnExhaustedPolicy,
113    listener: WorkerListener,
114    /// Plugins for the compaction scheduler.
115    plugins: Plugins,
116}
117
118impl CompactionScheduler {
119    #[allow(clippy::too_many_arguments)]
120    pub(crate) fn new(
121        scheduler: SchedulerRef,
122        request_sender: Sender<WorkerRequestWithTime>,
123        cache_manager: CacheManagerRef,
124        engine_config: Arc<MitoConfig>,
125        listener: WorkerListener,
126        plugins: Plugins,
127        memory_manager: Arc<CompactionMemoryManager>,
128        memory_policy: OnExhaustedPolicy,
129    ) -> Self {
130        Self {
131            scheduler,
132            region_status: HashMap::new(),
133            request_sender,
134            cache_manager,
135            engine_config,
136            memory_manager,
137            memory_policy,
138            listener,
139            plugins,
140        }
141    }
142
143    /// Schedules a compaction for the region.
144    #[allow(clippy::too_many_arguments)]
145    pub(crate) async fn schedule_compaction(
146        &mut self,
147        region_id: RegionId,
148        compact_options: compact_request::Options,
149        version_control: &VersionControlRef,
150        access_layer: &AccessLayerRef,
151        waiter: OptionOutputTx,
152        manifest_ctx: &ManifestContextRef,
153        schema_metadata_manager: SchemaMetadataManagerRef,
154        max_parallelism: usize,
155    ) -> Result<()> {
156        // skip compaction if region is in staging state
157        let current_state = manifest_ctx.current_state();
158        if current_state == RegionRoleState::Leader(RegionLeaderState::Staging) {
159            info!(
160                "Skipping compaction for region {} in staging mode, options: {:?}",
161                region_id, compact_options
162            );
163            waiter.send(Ok(0));
164            return Ok(());
165        }
166
167        if let Some(status) = self.region_status.get_mut(&region_id) {
168            match compact_options {
169                Options::Regular(_) => {
170                    // Region is compacting. Add the waiter to pending list.
171                    status.merge_waiter(waiter);
172                }
173                options @ Options::StrictWindow(_) => {
174                    // Incoming compaction request is manually triggered.
175                    status.set_pending_request(PendingCompaction {
176                        options,
177                        waiter,
178                        max_parallelism,
179                    });
180                    info!(
181                        "Region {} is compacting, manually compaction will be re-scheduled.",
182                        region_id
183                    );
184                }
185            }
186            return Ok(());
187        }
188
189        // The region can compact directly.
190        let mut status =
191            CompactionStatus::new(region_id, version_control.clone(), access_layer.clone());
192        let request = status.new_compaction_request(
193            self.request_sender.clone(),
194            waiter,
195            self.engine_config.clone(),
196            self.cache_manager.clone(),
197            manifest_ctx,
198            self.listener.clone(),
199            schema_metadata_manager,
200            max_parallelism,
201        );
202
203        let result = match self
204            .schedule_compaction_request(request, compact_options)
205            .await
206        {
207            Ok(Some(active_compaction)) => {
208                // Publish CompactionStatus only after a task has been accepted by the scheduler.
209                // This avoids exposing a half-initialized region status that could collect pending
210                // DDL/compaction state even though no compaction is actually running.
211                status.active_compaction = Some(active_compaction);
212                self.region_status.insert(region_id, status);
213
214                Ok(())
215            }
216            Ok(None) => Ok(()),
217            Err(e) => Err(e),
218        };
219
220        self.listener.on_compaction_scheduled(region_id);
221        result
222    }
223
224    // Handle pending manual compaction request for the region.
225    //
226    // Returns true if should early return, false otherwise.
227    pub(crate) async fn handle_pending_compaction_request(
228        &mut self,
229        region_id: RegionId,
230        manifest_ctx: &ManifestContextRef,
231        schema_metadata_manager: SchemaMetadataManagerRef,
232    ) -> bool {
233        let Some(status) = self.region_status.get_mut(&region_id) else {
234            return true;
235        };
236
237        // If there is a pending manual compaction request, schedule it.
238        // and defer returning the pending DDL requests to the caller.
239        let Some(pending_request) = std::mem::take(&mut status.pending_request) else {
240            return false;
241        };
242
243        let PendingCompaction {
244            options,
245            waiter,
246            max_parallelism,
247        } = pending_request;
248
249        let request = {
250            status.new_compaction_request(
251                self.request_sender.clone(),
252                waiter,
253                self.engine_config.clone(),
254                self.cache_manager.clone(),
255                manifest_ctx,
256                self.listener.clone(),
257                schema_metadata_manager,
258                max_parallelism,
259            )
260        };
261
262        match self.schedule_compaction_request(request, options).await {
263            Ok(Some(active_compaction)) => {
264                let status = self.region_status.get_mut(&region_id).unwrap();
265                status.active_compaction = Some(active_compaction);
266                debug!(
267                    "Successfully scheduled manual compaction for region id: {}",
268                    region_id
269                );
270                true
271            }
272            Ok(None) => {
273                // We still need to handle the pending DDL requests.
274                // So we can't return early here.
275                false
276            }
277            Err(e) => {
278                error!(e; "Failed to continue pending manual compaction for region id: {}", region_id);
279                self.remove_region_on_failure(region_id, Arc::new(e));
280                true
281            }
282        }
283    }
284
285    /// Notifies the scheduler that the compaction job is finished successfully.
286    pub(crate) async fn on_compaction_finished(
287        &mut self,
288        region_id: RegionId,
289        manifest_ctx: &ManifestContextRef,
290        schema_metadata_manager: SchemaMetadataManagerRef,
291    ) -> Vec<SenderDdlRequest> {
292        let Some(status) = self.region_status.get_mut(&region_id) else {
293            return Vec::new();
294        };
295        status.clear_running_task();
296
297        // If there a pending compaction request, handle it first
298        // and defer returning the pending DDL requests to the caller.
299        if self
300            .handle_pending_compaction_request(
301                region_id,
302                manifest_ctx,
303                schema_metadata_manager.clone(),
304            )
305            .await
306        {
307            return Vec::new();
308        }
309
310        let Some(status) = self.region_status.get_mut(&region_id) else {
311            // The region status might be removed by the previous steps.
312            // So we return empty DDL requests.
313            return Vec::new();
314        };
315
316        for waiter in std::mem::take(&mut status.waiters) {
317            waiter.send(Ok(0));
318        }
319
320        // If there are pending DDL requests, run them.
321        let pending_ddl_requests = std::mem::take(&mut status.pending_ddl_requests);
322        if !pending_ddl_requests.is_empty() {
323            self.region_status.remove(&region_id);
324            // If there are pending DDL requests, we should return them to the caller.
325            // And skip try to schedule next compaction task.
326            return pending_ddl_requests;
327        }
328
329        // We should always try to compact the region until picker returns None.
330        let request = status.new_compaction_request(
331            self.request_sender.clone(),
332            OptionOutputTx::none(),
333            self.engine_config.clone(),
334            self.cache_manager.clone(),
335            manifest_ctx,
336            self.listener.clone(),
337            schema_metadata_manager,
338            MAX_PARALLEL_COMPACTION,
339        );
340
341        // Try to schedule next compaction task for this region.
342        match self
343            .schedule_compaction_request(
344                request,
345                compact_request::Options::Regular(Default::default()),
346            )
347            .await
348        {
349            Ok(Some(active_compaction)) => {
350                self.region_status
351                    .get_mut(&region_id)
352                    .unwrap()
353                    .active_compaction = Some(active_compaction);
354                debug!(
355                    "Successfully scheduled next compaction for region id: {}",
356                    region_id
357                );
358            }
359            Ok(None) => {
360                // No further compaction tasks can be scheduled; cleanup the `CompactionStatus` for this region.
361                // All DDL requests and pending compaction requests have already been processed.
362                // Safe to remove the region from status tracking.
363                self.region_status.remove(&region_id);
364            }
365            Err(e) => {
366                error!(e; "Failed to schedule next compaction for region {}", region_id);
367                self.remove_region_on_failure(region_id, Arc::new(e));
368            }
369        }
370
371        Vec::new()
372    }
373
374    /// Notifies the scheduler that the compaction job is cancelled cooperatively.
375    pub(crate) async fn on_compaction_cancelled(
376        &mut self,
377        region_id: RegionId,
378    ) -> Vec<SenderDdlRequest> {
379        self.remove_region_on_cancel(region_id)
380    }
381
382    /// Notifies the scheduler that the compaction job is failed.
383    pub(crate) fn on_compaction_failed(&mut self, region_id: RegionId, err: Arc<Error>) {
384        error!(err; "Region {} failed to compact, cancel all pending tasks", region_id);
385        self.remove_region_on_failure(region_id, err);
386    }
387
388    /// Notifies the scheduler that the region is dropped.
389    pub(crate) fn on_region_dropped(&mut self, region_id: RegionId) {
390        self.remove_region_on_failure(
391            region_id,
392            Arc::new(RegionDroppedSnafu { region_id }.build()),
393        );
394    }
395
396    /// Notifies the scheduler that the region is closed.
397    pub(crate) fn on_region_closed(&mut self, region_id: RegionId) {
398        self.remove_region_on_failure(region_id, Arc::new(RegionClosedSnafu { region_id }.build()));
399    }
400
401    /// Notifies the scheduler that the region is truncated.
402    pub(crate) fn on_region_truncated(&mut self, region_id: RegionId) {
403        self.remove_region_on_failure(
404            region_id,
405            Arc::new(RegionTruncatedSnafu { region_id }.build()),
406        );
407    }
408
409    /// Add ddl request to pending queue.
410    ///
411    /// # Panics
412    /// Panics if region didn't request compaction.
413    pub(crate) fn add_ddl_request_to_pending(&mut self, request: SenderDdlRequest) {
414        debug!(
415            "Added pending DDL request for region: {}, ddl: {:?}",
416            request.region_id, request.request
417        );
418        let status = self.region_status.get_mut(&request.region_id).unwrap();
419        status.pending_ddl_requests.push(request);
420    }
421
422    #[cfg(test)]
423    pub(crate) fn has_pending_ddls(&self, region_id: RegionId) -> bool {
424        let has_pending = self
425            .region_status
426            .get(&region_id)
427            .map(|status| !status.pending_ddl_requests.is_empty())
428            .unwrap_or(false);
429        debug!(
430            "Checked pending DDL requests for region: {}, has_pending: {}",
431            region_id, has_pending
432        );
433        has_pending
434    }
435
436    pub(crate) fn request_cancel(&mut self, region_id: RegionId) -> RequestCancelResult {
437        let Some(status) = self.region_status.get_mut(&region_id) else {
438            return RequestCancelResult::NotRunning;
439        };
440
441        status.request_cancel()
442    }
443
444    /// Schedules a compaction request.
445    ///
446    /// Returns the active compaction state if the request is scheduled successfully.
447    /// Returns `None` if no compaction task can be scheduled for this region.
448    async fn schedule_compaction_request(
449        &mut self,
450        request: CompactionRequest,
451        options: compact_request::Options,
452    ) -> Result<Option<ActiveCompaction>> {
453        let region_id = request.region_id();
454        let (dynamic_compaction_opts, ttl) = find_dynamic_options(
455            region_id,
456            &request.current_version.options,
457            &request.schema_metadata_manager,
458        )
459        .await
460        .unwrap_or_else(|e| {
461            warn!(e; "Failed to find dynamic options for region: {}", region_id);
462            (
463                request.current_version.options.compaction.clone(),
464                request.current_version.options.ttl.unwrap_or_default(),
465            )
466        });
467
468        let picker = new_picker(
469            &options,
470            &dynamic_compaction_opts,
471            request.current_version.options.append_mode,
472            Some(self.engine_config.max_background_compactions),
473        );
474        let region_id = request.region_id();
475        let CompactionRequest {
476            engine_config,
477            current_version,
478            access_layer,
479            request_sender,
480            waiters,
481            start_time,
482            cache_manager,
483            manifest_ctx,
484            listener,
485            schema_metadata_manager: _,
486            max_parallelism,
487        } = request;
488
489        debug!(
490            "Pick compaction strategy {:?} for region: {}, ttl: {:?}",
491            picker, region_id, ttl
492        );
493
494        let compaction_region = CompactionRegion {
495            region_id,
496            current_version: current_version.clone(),
497            region_options: RegionOptions {
498                compaction: dynamic_compaction_opts.clone(),
499                ..current_version.options.clone()
500            },
501            engine_config: engine_config.clone(),
502            region_metadata: current_version.metadata.clone(),
503            cache_manager: cache_manager.clone(),
504            access_layer: access_layer.clone(),
505            manifest_ctx: manifest_ctx.clone(),
506            file_purger: None,
507            ttl: Some(ttl),
508            max_parallelism,
509        };
510
511        let picker_output = {
512            let _pick_timer = COMPACTION_STAGE_ELAPSED
513                .with_label_values(&["pick"])
514                .start_timer();
515            picker.pick(&compaction_region)
516        };
517
518        let picker_output = if let Some(picker_output) = picker_output {
519            picker_output
520        } else {
521            // Nothing to compact, we are done. Notifies all waiters as we consume the compaction request.
522            for waiter in waiters {
523                waiter.send(Ok(0));
524            }
525            return Ok(None);
526        };
527
528        // If specified to run compaction remotely, we schedule the compaction job remotely.
529        // It will fall back to local compaction if there is no remote job scheduler.
530        let waiters = if dynamic_compaction_opts.remote_compaction() {
531            if let Some(remote_job_scheduler) = &self.plugins.get::<RemoteJobSchedulerRef>() {
532                let remote_compaction_job = CompactionJob {
533                    compaction_region: compaction_region.clone(),
534                    picker_output: picker_output.clone(),
535                    start_time,
536                    waiters,
537                    ttl,
538                };
539
540                let result = remote_job_scheduler
541                    .schedule(
542                        RemoteJob::CompactionJob(remote_compaction_job),
543                        Box::new(DefaultNotifier {
544                            request_sender: request_sender.clone(),
545                        }),
546                    )
547                    .await;
548
549                match result {
550                    Ok(job_id) => {
551                        info!(
552                            "Scheduled remote compaction job {} for region {}",
553                            job_id, region_id
554                        );
555                        INFLIGHT_COMPACTION_COUNT.inc();
556                        return Ok(Some(ActiveCompaction::Remote));
557                    }
558                    Err(e) => {
559                        if !dynamic_compaction_opts.fallback_to_local() {
560                            error!(e; "Failed to schedule remote compaction job for region {}", region_id);
561                            return RemoteCompactionSnafu {
562                                region_id,
563                                job_id: None,
564                                reason: e.reason,
565                            }
566                            .fail();
567                        }
568
569                        error!(e; "Failed to schedule remote compaction job for region {}, fallback to local compaction", region_id);
570
571                        // Return the waiters back to the caller for local compaction.
572                        e.waiters
573                    }
574                }
575            } else {
576                debug!(
577                    "Remote compaction is not enabled, fallback to local compaction for region {}",
578                    region_id
579                );
580                waiters
581            }
582        } else {
583            waiters
584        };
585
586        // Create a local compaction task.
587        let estimated_bytes = estimate_compaction_bytes(&picker_output);
588
589        let cancel_handle = Arc::new(CancellationHandle::default());
590        let state = LocalCompactionState::new(cancel_handle.clone());
591        let local_compaction_task = Box::new(CompactionTaskImpl {
592            state: state.clone(),
593            request_sender,
594            waiters,
595            start_time,
596            listener,
597            picker_output,
598            compaction_region,
599            compactor: Arc::new(DefaultCompactor::with_cancel_handle(cancel_handle.clone())),
600            memory_manager: self.memory_manager.clone(),
601            memory_policy: self.memory_policy,
602            estimated_memory_bytes: estimated_bytes,
603        });
604
605        self.submit_compaction_task(local_compaction_task, region_id)
606            .map(|_| Some(ActiveCompaction::Local { state }))
607    }
608
609    fn submit_compaction_task(
610        &mut self,
611        mut task: Box<CompactionTaskImpl>,
612        region_id: RegionId,
613    ) -> Result<()> {
614        self.scheduler
615            .schedule(Box::pin(async move {
616                INFLIGHT_COMPACTION_COUNT.inc();
617                task.run().await;
618                INFLIGHT_COMPACTION_COUNT.dec();
619            }))
620            .inspect_err(
621                |e| error!(e; "Failed to submit compaction request for region {}", region_id),
622            )
623    }
624
625    fn remove_region_on_failure(&mut self, region_id: RegionId, err: Arc<Error>) {
626        // Remove this region.
627        let Some(status) = self.region_status.remove(&region_id) else {
628            return;
629        };
630
631        // Notifies all pending tasks.
632        status.on_failure(err);
633    }
634
635    fn remove_region_on_cancel(&mut self, region_id: RegionId) -> Vec<SenderDdlRequest> {
636        let Some(status) = self.region_status.remove(&region_id) else {
637            return Vec::new();
638        };
639
640        status.on_cancel()
641    }
642}
643
644#[derive(Debug, Clone)]
645pub(crate) struct LocalCompactionState {
646    cancel_handle: Arc<CancellationHandle>,
647    commit_started: Arc<Mutex<bool>>,
648}
649
650#[derive(Debug)]
651enum ActiveCompaction {
652    Local { state: LocalCompactionState },
653    Remote,
654}
655
656impl LocalCompactionState {
657    fn new(cancel_handle: Arc<CancellationHandle>) -> Self {
658        Self {
659            cancel_handle,
660            commit_started: Arc::new(Mutex::new(false)),
661        }
662    }
663
664    /// Returns the cancellation handle for this compaction task.
665    pub(crate) fn cancel_handle(&self) -> Arc<CancellationHandle> {
666        self.cancel_handle.clone()
667    }
668
669    /// Marks the compaction task as started to commit,
670    /// which means the compaction task is in the final stage and is about to update region version and manifest.
671    /// It will reject cancellation request after this method is called.
672    ///
673    /// Returns true if this is the first time to mark commit started, false otherwise.
674    pub(crate) fn mark_commit_started(&self) -> bool {
675        let mut commit_started = self.commit_started.lock().unwrap();
676        if self.cancel_handle.is_cancelled() {
677            return false;
678        }
679        *commit_started = true;
680        true
681    }
682
683    /// Request cancellation for this compaction task.
684    pub(crate) fn request_cancel(&self) -> RequestCancelResult {
685        // The cancel handle must under the lock of `commit_started` to avoid racing between cancellation and commit.
686        let commit_started = self.commit_started.lock().unwrap();
687        if *commit_started {
688            return RequestCancelResult::TooLateToCancel;
689        }
690        if self.cancel_handle.is_cancelled() {
691            return RequestCancelResult::AlreadyCancelling;
692        }
693
694        self.cancel_handle.cancel();
695        RequestCancelResult::CancelIssued
696    }
697}
698
699#[derive(Debug, Clone, Copy, PartialEq, Eq)]
700pub(crate) enum RequestCancelResult {
701    CancelIssued,
702    AlreadyCancelling,
703    TooLateToCancel,
704    NotRunning,
705}
706
707impl Drop for CompactionScheduler {
708    fn drop(&mut self) {
709        for (region_id, status) in self.region_status.drain() {
710            // We are shutting down so notify all pending tasks.
711            status.on_failure(Arc::new(RegionClosedSnafu { region_id }.build()));
712        }
713    }
714}
715
716/// Finds compaction options and TTL together with a single metadata fetch to reduce RTT.
717async fn find_dynamic_options(
718    region_id: RegionId,
719    region_options: &crate::region::options::RegionOptions,
720    schema_metadata_manager: &SchemaMetadataManagerRef,
721) -> Result<(crate::region::options::CompactionOptions, TimeToLive)> {
722    let table_id = region_id.table_id();
723    if let (true, Some(ttl)) = (region_options.compaction_override, region_options.ttl) {
724        debug!(
725            "Use region options directly for table {}: compaction={:?}, ttl={:?}",
726            table_id, region_options.compaction, region_options.ttl
727        );
728        return Ok((region_options.compaction.clone(), ttl));
729    }
730
731    let db_options = tokio::time::timeout(
732        crate::config::FETCH_OPTION_TIMEOUT,
733        schema_metadata_manager.get_schema_options_by_table_id(table_id),
734    )
735    .await
736    .context(TimeoutSnafu)?
737    .context(GetSchemaMetadataSnafu)?;
738
739    let ttl = if let Some(ttl) = region_options.ttl {
740        debug!(
741            "Use region TTL directly for table {}: ttl={:?}",
742            table_id, region_options.ttl
743        );
744        ttl
745    } else {
746        db_options
747            .as_ref()
748            .and_then(|options| options.ttl)
749            .unwrap_or_default()
750            .into()
751    };
752
753    let compaction = if !region_options.compaction_override {
754        if let Some(schema_opts) = db_options {
755            let map: HashMap<String, String> = schema_opts
756                .extra_options
757                .iter()
758                .filter_map(|(k, v)| {
759                    if k.starts_with("compaction.") {
760                        Some((k.clone(), v.clone()))
761                    } else {
762                        None
763                    }
764                })
765                .collect();
766            if map.is_empty() {
767                region_options.compaction.clone()
768            } else {
769                crate::region::options::RegionOptions::try_from_options(region_id, &map)
770                    .map(|o| o.compaction)
771                    .unwrap_or_else(|e| {
772                        error!(e; "Failed to create RegionOptions from map");
773                        region_options.compaction.clone()
774                    })
775            }
776        } else {
777            debug!(
778                "DB options is None for table {}, use region compaction: compaction={:?}",
779                table_id, region_options.compaction
780            );
781            region_options.compaction.clone()
782        }
783    } else {
784        debug!(
785            "No schema options for table {}, use region compaction: compaction={:?}",
786            table_id, region_options.compaction
787        );
788        region_options.compaction.clone()
789    };
790
791    debug!(
792        "Resolved dynamic options for table {}: compaction={:?}, ttl={:?}",
793        table_id, compaction, ttl
794    );
795    Ok((compaction, ttl))
796}
797
798/// Status of running and pending region compaction tasks.
799struct CompactionStatus {
800    /// Id of the region.
801    region_id: RegionId,
802    /// Version control of the region.
803    version_control: VersionControlRef,
804    /// Access layer of the region.
805    access_layer: AccessLayerRef,
806    /// Pending waiters for compaction.
807    waiters: Vec<OutputTx>,
808    /// Pending compactions that are supposed to run as soon as current compaction task finished.
809    pending_request: Option<PendingCompaction>,
810    /// Pending DDL requests that should run when compaction is done.
811    pending_ddl_requests: Vec<SenderDdlRequest>,
812    /// Active compaction state.
813    active_compaction: Option<ActiveCompaction>,
814}
815
816impl CompactionStatus {
817    /// Creates a new [CompactionStatus]
818    fn new(
819        region_id: RegionId,
820        version_control: VersionControlRef,
821        access_layer: AccessLayerRef,
822    ) -> CompactionStatus {
823        CompactionStatus {
824            region_id,
825            version_control,
826            access_layer,
827            waiters: Vec::new(),
828            pending_request: None,
829            pending_ddl_requests: Vec::new(),
830            active_compaction: None,
831        }
832    }
833
834    #[cfg(test)]
835    fn start_local_task(&mut self) -> LocalCompactionState {
836        let state = LocalCompactionState::new(Arc::new(CancellationHandle::default()));
837        self.active_compaction = Some(ActiveCompaction::Local {
838            state: state.clone(),
839        });
840        state
841    }
842
843    #[cfg(test)]
844    fn start_remote_task(&mut self) {
845        self.active_compaction = Some(ActiveCompaction::Remote);
846    }
847
848    fn request_cancel(&mut self) -> RequestCancelResult {
849        let Some(active_compaction) = &self.active_compaction else {
850            return RequestCancelResult::NotRunning;
851        };
852
853        match active_compaction {
854            ActiveCompaction::Local { state, .. } => state.request_cancel(),
855            ActiveCompaction::Remote => RequestCancelResult::TooLateToCancel,
856        }
857    }
858
859    fn clear_running_task(&mut self) -> bool {
860        self.active_compaction.take().is_some()
861    }
862
863    /// Merge the waiter to the pending compaction.
864    fn merge_waiter(&mut self, mut waiter: OptionOutputTx) {
865        if let Some(waiter) = waiter.take_inner() {
866            self.waiters.push(waiter);
867        }
868    }
869
870    /// Set pending compaction request or replace current value if already exist.
871    fn set_pending_request(&mut self, pending: PendingCompaction) {
872        if let Some(prev) = self.pending_request.replace(pending) {
873            debug!(
874                "Replace pending compaction options with new request {:?} for region: {}",
875                prev.options, self.region_id
876            );
877            prev.waiter.send(ManualCompactionOverrideSnafu.fail());
878        }
879    }
880
881    fn on_failure(mut self, err: Arc<Error>) {
882        for waiter in self.waiters.drain(..) {
883            waiter.send(Err(err.clone()).context(CompactRegionSnafu {
884                region_id: self.region_id,
885            }));
886        }
887
888        if let Some(pending_compaction) = self.pending_request {
889            pending_compaction
890                .waiter
891                .send(Err(err.clone()).context(CompactRegionSnafu {
892                    region_id: self.region_id,
893                }));
894        }
895
896        for pending_ddl in self.pending_ddl_requests {
897            pending_ddl
898                .sender
899                .send(Err(err.clone()).context(CompactRegionSnafu {
900                    region_id: self.region_id,
901                }));
902        }
903    }
904
905    #[must_use]
906    fn on_cancel(mut self) -> Vec<SenderDdlRequest> {
907        for waiter in self.waiters.drain(..) {
908            waiter.send(CompactionCancelledSnafu.fail());
909        }
910
911        if let Some(pending_compaction) = self.pending_request {
912            pending_compaction.waiter.send(
913                Err(Arc::new(CompactionCancelledSnafu.build())).context(CompactRegionSnafu {
914                    region_id: self.region_id,
915                }),
916            );
917        }
918
919        std::mem::take(&mut self.pending_ddl_requests)
920    }
921
922    /// Creates a new compaction request for compaction picker.
923    ///
924    /// It consumes all pending compaction waiters.
925    #[allow(clippy::too_many_arguments)]
926    fn new_compaction_request(
927        &mut self,
928        request_sender: Sender<WorkerRequestWithTime>,
929        mut waiter: OptionOutputTx,
930        engine_config: Arc<MitoConfig>,
931        cache_manager: CacheManagerRef,
932        manifest_ctx: &ManifestContextRef,
933        listener: WorkerListener,
934        schema_metadata_manager: SchemaMetadataManagerRef,
935        max_parallelism: usize,
936    ) -> CompactionRequest {
937        let current_version = CompactionVersion::from(self.version_control.current().version);
938        let start_time = Instant::now();
939        let mut waiters = Vec::with_capacity(self.waiters.len() + 1);
940        waiters.extend(std::mem::take(&mut self.waiters));
941
942        if let Some(waiter) = waiter.take_inner() {
943            waiters.push(waiter);
944        }
945
946        CompactionRequest {
947            engine_config,
948            current_version,
949            access_layer: self.access_layer.clone(),
950            request_sender: request_sender.clone(),
951            waiters,
952            start_time,
953            cache_manager,
954            manifest_ctx: manifest_ctx.clone(),
955            listener,
956            schema_metadata_manager,
957            max_parallelism,
958        }
959    }
960}
961
962#[derive(Debug, Clone)]
963pub struct CompactionOutput {
964    /// Compaction output file level.
965    pub output_level: Level,
966    /// Compaction input files.
967    pub inputs: Vec<FileHandle>,
968    /// Whether to remove deletion markers.
969    pub filter_deleted: bool,
970    /// Compaction output time range. Only windowed compaction specifies output time range.
971    pub output_time_range: Option<TimestampRange>,
972}
973
974/// SerializedCompactionOutput is a serialized version of [CompactionOutput] by replacing [FileHandle] with [FileMeta].
975#[derive(Debug, Clone, Serialize, Deserialize)]
976pub struct SerializedCompactionOutput {
977    output_level: Level,
978    inputs: Vec<FileMeta>,
979    filter_deleted: bool,
980    output_time_range: Option<TimestampRange>,
981}
982
983/// Builders to create [BoxedRecordBatchStream] for compaction.
984struct CompactionSstReaderBuilder<'a> {
985    metadata: RegionMetadataRef,
986    sst_layer: AccessLayerRef,
987    cache: CacheManagerRef,
988    inputs: &'a [FileHandle],
989    append_mode: bool,
990    filter_deleted: bool,
991    time_range: Option<TimestampRange>,
992    merge_mode: MergeMode,
993}
994
995impl CompactionSstReaderBuilder<'_> {
996    /// Build a [FlatSource] that yields Arrow `RecordBatch`s from reading all the input SST files,
997    /// for compaction. The schema of the [FlatSource] is unified.
998    async fn build_flat_sst_reader(self) -> Result<FlatSource> {
999        let scan_input = self.build_scan_input()?.with_compaction(true);
1000
1001        let schema = scan_input.mapper.output_schema();
1002        let schema = schema.arrow_schema();
1003
1004        SeqScan::new(scan_input)
1005            .build_flat_reader_for_compaction()
1006            .await
1007            .map(|stream| FlatSource::new_stream(schema.clone(), stream))
1008    }
1009
1010    fn build_scan_input(self) -> Result<ScanInput> {
1011        let mapper = FlatProjectionMapper::all(&self.metadata)?;
1012        let mut scan_input = ScanInput::new(self.sst_layer, mapper)
1013            .with_files(self.inputs.to_vec())
1014            .with_append_mode(self.append_mode)
1015            // We use special cache strategy for compaction.
1016            .with_cache(CacheStrategy::Compaction(self.cache))
1017            .with_filter_deleted(self.filter_deleted)
1018            // We ignore file not found error during compaction.
1019            .with_ignore_file_not_found(true)
1020            .with_merge_mode(self.merge_mode);
1021
1022        // This serves as a workaround of https://github.com/GreptimeTeam/greptimedb/issues/3944
1023        // by converting time ranges into predicate.
1024        if let Some(time_range) = self.time_range {
1025            scan_input =
1026                scan_input.with_predicate(time_range_to_predicate(time_range, &self.metadata)?);
1027        }
1028
1029        Ok(scan_input)
1030    }
1031}
1032
1033/// Converts time range to predicates so that rows outside the range will be filtered.
1034fn time_range_to_predicate(
1035    range: TimestampRange,
1036    metadata: &RegionMetadataRef,
1037) -> Result<PredicateGroup> {
1038    let ts_col = metadata.time_index_column();
1039
1040    // safety: time index column's type must be a valid timestamp type.
1041    let ts_col_unit = ts_col
1042        .column_schema
1043        .data_type
1044        .as_timestamp()
1045        .unwrap()
1046        .unit();
1047
1048    let exprs = match (range.start(), range.end()) {
1049        (Some(start), Some(end)) => {
1050            vec![
1051                datafusion_expr::col(ts_col.column_schema.name.clone())
1052                    .gt_eq(ts_to_lit(*start, ts_col_unit)?),
1053                datafusion_expr::col(ts_col.column_schema.name.clone())
1054                    .lt(ts_to_lit(*end, ts_col_unit)?),
1055            ]
1056        }
1057        (Some(start), None) => {
1058            vec![
1059                datafusion_expr::col(ts_col.column_schema.name.clone())
1060                    .gt_eq(ts_to_lit(*start, ts_col_unit)?),
1061            ]
1062        }
1063
1064        (None, Some(end)) => {
1065            vec![
1066                datafusion_expr::col(ts_col.column_schema.name.clone())
1067                    .lt(ts_to_lit(*end, ts_col_unit)?),
1068            ]
1069        }
1070        (None, None) => {
1071            return Ok(PredicateGroup::default());
1072        }
1073    };
1074
1075    let predicate = PredicateGroup::new(metadata, &exprs)?;
1076    Ok(predicate)
1077}
1078
1079fn ts_to_lit(ts: Timestamp, ts_col_unit: TimeUnit) -> Result<Expr> {
1080    let ts = ts
1081        .convert_to(ts_col_unit)
1082        .context(TimeRangePredicateOverflowSnafu {
1083            timestamp: ts,
1084            unit: ts_col_unit,
1085        })?;
1086    let val = ts.value();
1087    let scalar_value = match ts_col_unit {
1088        TimeUnit::Second => ScalarValue::TimestampSecond(Some(val), None),
1089        TimeUnit::Millisecond => ScalarValue::TimestampMillisecond(Some(val), None),
1090        TimeUnit::Microsecond => ScalarValue::TimestampMicrosecond(Some(val), None),
1091        TimeUnit::Nanosecond => ScalarValue::TimestampNanosecond(Some(val), None),
1092    };
1093    Ok(datafusion_expr::lit(scalar_value))
1094}
1095
1096/// Finds all expired SSTs across levels.
1097fn get_expired_ssts(
1098    levels: &[LevelMeta],
1099    ttl: Option<TimeToLive>,
1100    now: Timestamp,
1101) -> Vec<FileHandle> {
1102    let Some(ttl) = ttl else {
1103        return vec![];
1104    };
1105
1106    levels
1107        .iter()
1108        .flat_map(|l| l.get_expired_files(&now, &ttl).into_iter())
1109        .collect()
1110}
1111
1112/// Estimates compaction memory as the sum of all input files' maximum row-group
1113/// uncompressed sizes.
1114fn estimate_compaction_bytes(picker_output: &PickerOutput) -> u64 {
1115    picker_output
1116        .outputs
1117        .iter()
1118        .flat_map(|output| output.inputs.iter())
1119        .map(|file: &FileHandle| {
1120            let meta = file.meta_ref();
1121            meta.max_row_group_uncompressed_size
1122        })
1123        .sum()
1124}
1125
1126/// Pending compaction request that is supposed to run after current task is finished,
1127/// typically used for manual compactions.
1128struct PendingCompaction {
1129    /// Compaction options. Currently, it can only be [StrictWindow].
1130    pub(crate) options: compact_request::Options,
1131    /// Waiters of pending requests.
1132    pub(crate) waiter: OptionOutputTx,
1133    /// Max parallelism for pending compaction.
1134    pub(crate) max_parallelism: usize,
1135}
1136
1137#[cfg(test)]
1138mod tests {
1139    use std::assert_matches;
1140    use std::time::Duration;
1141
1142    use api::v1::region::StrictWindow;
1143    use common_datasource::compression::CompressionType;
1144    use common_meta::key::schema_name::SchemaNameValue;
1145    use common_time::DatabaseTimeToLive;
1146    use tokio::sync::{Barrier, oneshot};
1147
1148    use super::*;
1149    use crate::compaction::memory_manager::{CompactionMemoryGuard, new_compaction_memory_manager};
1150    use crate::error::InvalidSchedulerStateSnafu;
1151    use crate::manifest::manager::{RegionManifestManager, RegionManifestOptions};
1152    use crate::region::ManifestContext;
1153    use crate::schedule::scheduler::{Job, Scheduler};
1154    use crate::sst::FormatType;
1155    use crate::test_util::mock_schema_metadata_manager;
1156    use crate::test_util::scheduler_util::{SchedulerEnv, VecScheduler};
1157    use crate::test_util::version_util::{VersionControlBuilder, apply_edit};
1158
1159    struct FailingScheduler;
1160
1161    #[async_trait::async_trait]
1162    impl Scheduler for FailingScheduler {
1163        fn schedule(&self, _job: Job) -> Result<()> {
1164            InvalidSchedulerStateSnafu.fail()
1165        }
1166
1167        async fn stop(&self, _await_termination: bool) -> Result<()> {
1168            Ok(())
1169        }
1170    }
1171
1172    #[tokio::test]
1173    async fn test_find_compaction_options_db_level() {
1174        let env = SchedulerEnv::new().await;
1175        let builder = VersionControlBuilder::new();
1176        let (schema_metadata_manager, kv_backend) = mock_schema_metadata_manager();
1177        let region_id = builder.region_id();
1178        let table_id = region_id.table_id();
1179        // Register table without ttl but with db-level compaction options
1180        let mut schema_value = SchemaNameValue {
1181            ttl: Some(DatabaseTimeToLive::default()),
1182            ..Default::default()
1183        };
1184        schema_value
1185            .extra_options
1186            .insert("compaction.type".to_string(), "twcs".to_string());
1187        schema_value
1188            .extra_options
1189            .insert("compaction.twcs.time_window".to_string(), "2h".to_string());
1190        schema_metadata_manager
1191            .register_region_table_info(
1192                table_id,
1193                "t",
1194                "c",
1195                "s",
1196                Some(schema_value),
1197                kv_backend.clone(),
1198            )
1199            .await;
1200
1201        let version_control = Arc::new(builder.build());
1202        let region_opts = version_control.current().version.options.clone();
1203        let (opts, _) = find_dynamic_options(region_id, &region_opts, &schema_metadata_manager)
1204            .await
1205            .unwrap();
1206        match opts {
1207            crate::region::options::CompactionOptions::Twcs(t) => {
1208                assert_eq!(t.time_window_seconds(), Some(2 * 3600));
1209            }
1210        }
1211        let manifest_ctx = env
1212            .mock_manifest_context(version_control.current().version.metadata.clone())
1213            .await;
1214        let (tx, _rx) = mpsc::channel(4);
1215        let mut scheduler = env.mock_compaction_scheduler(tx);
1216        let (otx, _orx) = oneshot::channel();
1217        let request = scheduler
1218            .region_status
1219            .entry(region_id)
1220            .or_insert_with(|| {
1221                crate::compaction::CompactionStatus::new(
1222                    region_id,
1223                    version_control.clone(),
1224                    env.access_layer.clone(),
1225                )
1226            })
1227            .new_compaction_request(
1228                scheduler.request_sender.clone(),
1229                OptionOutputTx::new(Some(OutputTx::new(otx))),
1230                scheduler.engine_config.clone(),
1231                scheduler.cache_manager.clone(),
1232                &manifest_ctx,
1233                scheduler.listener.clone(),
1234                schema_metadata_manager.clone(),
1235                1,
1236            );
1237        scheduler
1238            .schedule_compaction_request(
1239                request,
1240                compact_request::Options::Regular(Default::default()),
1241            )
1242            .await
1243            .unwrap();
1244    }
1245
1246    #[tokio::test]
1247    async fn test_find_compaction_options_priority() {
1248        fn schema_value_with_twcs(time_window: &str) -> SchemaNameValue {
1249            let mut schema_value = SchemaNameValue {
1250                ttl: Some(DatabaseTimeToLive::default()),
1251                ..Default::default()
1252            };
1253            schema_value
1254                .extra_options
1255                .insert("compaction.type".to_string(), "twcs".to_string());
1256            schema_value.extra_options.insert(
1257                "compaction.twcs.time_window".to_string(),
1258                time_window.to_string(),
1259            );
1260            schema_value
1261        }
1262
1263        let cases = [
1264            (
1265                "db options set and table override set",
1266                Some(schema_value_with_twcs("2h")),
1267                true,
1268                Some(Duration::from_secs(5 * 3600)),
1269                Some(5 * 3600),
1270            ),
1271            (
1272                "db options set and table override not set",
1273                Some(schema_value_with_twcs("2h")),
1274                false,
1275                None,
1276                Some(2 * 3600),
1277            ),
1278            (
1279                "db options not set and table override set",
1280                None,
1281                true,
1282                Some(Duration::from_secs(4 * 3600)),
1283                Some(4 * 3600),
1284            ),
1285            (
1286                "db options not set and table override not set",
1287                None,
1288                false,
1289                None,
1290                None,
1291            ),
1292        ];
1293
1294        for (case_name, schema_value, override_set, table_window, expected_window) in cases {
1295            let builder = VersionControlBuilder::new();
1296            let (schema_metadata_manager, kv_backend) = mock_schema_metadata_manager();
1297            let region_id = builder.region_id();
1298            let table_id = region_id.table_id();
1299            schema_metadata_manager
1300                .register_region_table_info(
1301                    table_id,
1302                    "t",
1303                    "c",
1304                    "s",
1305                    schema_value,
1306                    kv_backend.clone(),
1307                )
1308                .await;
1309
1310            let version_control = Arc::new(builder.build());
1311            let mut region_opts = version_control.current().version.options.clone();
1312            region_opts.compaction_override = override_set;
1313            if let Some(window) = table_window {
1314                let crate::region::options::CompactionOptions::Twcs(twcs) =
1315                    &mut region_opts.compaction;
1316                twcs.time_window = Some(window);
1317            }
1318
1319            let (opts, _) = find_dynamic_options(region_id, &region_opts, &schema_metadata_manager)
1320                .await
1321                .unwrap();
1322            match opts {
1323                crate::region::options::CompactionOptions::Twcs(t) => {
1324                    assert_eq!(t.time_window_seconds(), expected_window, "{case_name}");
1325                }
1326            }
1327        }
1328    }
1329
1330    #[tokio::test]
1331    async fn test_schedule_empty() {
1332        let env = SchedulerEnv::new().await;
1333        let (tx, _rx) = mpsc::channel(4);
1334        let mut scheduler = env.mock_compaction_scheduler(tx);
1335        let mut builder = VersionControlBuilder::new();
1336        let (schema_metadata_manager, kv_backend) = mock_schema_metadata_manager();
1337        schema_metadata_manager
1338            .register_region_table_info(
1339                builder.region_id().table_id(),
1340                "test_table",
1341                "test_catalog",
1342                "test_schema",
1343                None,
1344                kv_backend,
1345            )
1346            .await;
1347        // Nothing to compact.
1348        let version_control = Arc::new(builder.build());
1349        let (output_tx, output_rx) = oneshot::channel();
1350        let waiter = OptionOutputTx::from(output_tx);
1351        let manifest_ctx = env
1352            .mock_manifest_context(version_control.current().version.metadata.clone())
1353            .await;
1354        scheduler
1355            .schedule_compaction(
1356                builder.region_id(),
1357                compact_request::Options::Regular(Default::default()),
1358                &version_control,
1359                &env.access_layer,
1360                waiter,
1361                &manifest_ctx,
1362                schema_metadata_manager.clone(),
1363                1,
1364            )
1365            .await
1366            .unwrap();
1367        let output = output_rx.await.unwrap().unwrap();
1368        assert_eq!(output, 0);
1369        assert!(scheduler.region_status.is_empty());
1370
1371        // Only one file, picker won't compact it.
1372        let version_control = Arc::new(builder.push_l0_file(0, 1000).build());
1373        let (output_tx, output_rx) = oneshot::channel();
1374        let waiter = OptionOutputTx::from(output_tx);
1375        scheduler
1376            .schedule_compaction(
1377                builder.region_id(),
1378                compact_request::Options::Regular(Default::default()),
1379                &version_control,
1380                &env.access_layer,
1381                waiter,
1382                &manifest_ctx,
1383                schema_metadata_manager,
1384                1,
1385            )
1386            .await
1387            .unwrap();
1388        let output = output_rx.await.unwrap().unwrap();
1389        assert_eq!(output, 0);
1390        assert!(scheduler.region_status.is_empty());
1391    }
1392
1393    #[tokio::test]
1394    async fn test_schedule_on_finished() {
1395        common_telemetry::init_default_ut_logging();
1396        let job_scheduler = Arc::new(VecScheduler::default());
1397        let env = SchedulerEnv::new().await.scheduler(job_scheduler.clone());
1398        let (tx, _rx) = mpsc::channel(4);
1399        let mut scheduler = env.mock_compaction_scheduler(tx);
1400        let mut builder = VersionControlBuilder::new();
1401        let purger = builder.file_purger();
1402        let region_id = builder.region_id();
1403
1404        let (schema_metadata_manager, kv_backend) = mock_schema_metadata_manager();
1405        schema_metadata_manager
1406            .register_region_table_info(
1407                builder.region_id().table_id(),
1408                "test_table",
1409                "test_catalog",
1410                "test_schema",
1411                None,
1412                kv_backend,
1413            )
1414            .await;
1415
1416        // 5 files to compact.
1417        let end = 1000 * 1000;
1418        let version_control = Arc::new(
1419            builder
1420                .push_l0_file(0, end)
1421                .push_l0_file(10, end)
1422                .push_l0_file(50, end)
1423                .push_l0_file(80, end)
1424                .push_l0_file(90, end)
1425                .build(),
1426        );
1427        let manifest_ctx = env
1428            .mock_manifest_context(version_control.current().version.metadata.clone())
1429            .await;
1430        scheduler
1431            .schedule_compaction(
1432                region_id,
1433                compact_request::Options::Regular(Default::default()),
1434                &version_control,
1435                &env.access_layer,
1436                OptionOutputTx::none(),
1437                &manifest_ctx,
1438                schema_metadata_manager.clone(),
1439                1,
1440            )
1441            .await
1442            .unwrap();
1443        // Should schedule 1 compaction.
1444        assert_eq!(1, scheduler.region_status.len());
1445        assert_eq!(1, job_scheduler.num_jobs());
1446        let data = version_control.current();
1447        let file_metas: Vec<_> = data.version.ssts.levels()[0]
1448            .files
1449            .values()
1450            .map(|file| file.meta_ref().clone())
1451            .collect();
1452
1453        // 5 files for next compaction and removes old files.
1454        apply_edit(
1455            &version_control,
1456            &[(0, end), (20, end), (40, end), (60, end), (80, end)],
1457            &file_metas,
1458            purger.clone(),
1459        );
1460        // The task is pending.
1461        let (tx, _rx) = oneshot::channel();
1462        scheduler
1463            .schedule_compaction(
1464                region_id,
1465                compact_request::Options::Regular(Default::default()),
1466                &version_control,
1467                &env.access_layer,
1468                OptionOutputTx::new(Some(OutputTx::new(tx))),
1469                &manifest_ctx,
1470                schema_metadata_manager.clone(),
1471                1,
1472            )
1473            .await
1474            .unwrap();
1475        assert_eq!(1, scheduler.region_status.len());
1476        assert_eq!(1, job_scheduler.num_jobs());
1477        assert!(
1478            !scheduler
1479                .region_status
1480                .get(&builder.region_id())
1481                .unwrap()
1482                .waiters
1483                .is_empty()
1484        );
1485
1486        // On compaction finished and schedule next compaction.
1487        scheduler
1488            .on_compaction_finished(region_id, &manifest_ctx, schema_metadata_manager.clone())
1489            .await;
1490        assert_eq!(1, scheduler.region_status.len());
1491        assert_eq!(2, job_scheduler.num_jobs());
1492
1493        // 5 files for next compaction.
1494        apply_edit(
1495            &version_control,
1496            &[(0, end), (20, end), (40, end), (60, end), (80, end)],
1497            &[],
1498            purger.clone(),
1499        );
1500        let (tx, _rx) = oneshot::channel();
1501        // The task is pending.
1502        scheduler
1503            .schedule_compaction(
1504                region_id,
1505                compact_request::Options::Regular(Default::default()),
1506                &version_control,
1507                &env.access_layer,
1508                OptionOutputTx::new(Some(OutputTx::new(tx))),
1509                &manifest_ctx,
1510                schema_metadata_manager,
1511                1,
1512            )
1513            .await
1514            .unwrap();
1515        assert_eq!(2, job_scheduler.num_jobs());
1516        assert!(
1517            !scheduler
1518                .region_status
1519                .get(&builder.region_id())
1520                .unwrap()
1521                .waiters
1522                .is_empty()
1523        );
1524    }
1525
1526    #[tokio::test]
1527    async fn test_schedule_compaction_does_not_publish_status_when_schedule_fails() {
1528        common_telemetry::init_default_ut_logging();
1529        let env = SchedulerEnv::new()
1530            .await
1531            .scheduler(Arc::new(FailingScheduler));
1532        let (tx, _rx) = mpsc::channel(4);
1533        let mut scheduler = env.mock_compaction_scheduler(tx);
1534        let mut builder = VersionControlBuilder::new();
1535        let end = 1000 * 1000;
1536        let version_control = Arc::new(
1537            builder
1538                .push_l0_file(0, end)
1539                .push_l0_file(10, end)
1540                .push_l0_file(50, end)
1541                .push_l0_file(80, end)
1542                .push_l0_file(90, end)
1543                .build(),
1544        );
1545        let region_id = builder.region_id();
1546        let manifest_ctx = env
1547            .mock_manifest_context(version_control.current().version.metadata.clone())
1548            .await;
1549        let (schema_metadata_manager, kv_backend) = mock_schema_metadata_manager();
1550        schema_metadata_manager
1551            .register_region_table_info(
1552                builder.region_id().table_id(),
1553                "test_table",
1554                "test_catalog",
1555                "test_schema",
1556                None,
1557                kv_backend,
1558            )
1559            .await;
1560
1561        let result = scheduler
1562            .schedule_compaction(
1563                region_id,
1564                compact_request::Options::Regular(Default::default()),
1565                &version_control,
1566                &env.access_layer,
1567                OptionOutputTx::none(),
1568                &manifest_ctx,
1569                schema_metadata_manager,
1570                1,
1571            )
1572            .await;
1573
1574        assert!(result.is_err());
1575        assert!(!scheduler.region_status.contains_key(&region_id));
1576    }
1577
1578    #[tokio::test]
1579    async fn test_manual_compaction_when_compaction_in_progress() {
1580        common_telemetry::init_default_ut_logging();
1581        let job_scheduler = Arc::new(VecScheduler::default());
1582        let env = SchedulerEnv::new().await.scheduler(job_scheduler.clone());
1583        let (tx, _rx) = mpsc::channel(4);
1584        let mut scheduler = env.mock_compaction_scheduler(tx);
1585        let mut builder = VersionControlBuilder::new();
1586        let purger = builder.file_purger();
1587        let region_id = builder.region_id();
1588
1589        let (schema_metadata_manager, kv_backend) = mock_schema_metadata_manager();
1590        schema_metadata_manager
1591            .register_region_table_info(
1592                builder.region_id().table_id(),
1593                "test_table",
1594                "test_catalog",
1595                "test_schema",
1596                None,
1597                kv_backend,
1598            )
1599            .await;
1600
1601        // 5 files to compact.
1602        let end = 1000 * 1000;
1603        let version_control = Arc::new(
1604            builder
1605                .push_l0_file(0, end)
1606                .push_l0_file(10, end)
1607                .push_l0_file(50, end)
1608                .push_l0_file(80, end)
1609                .push_l0_file(90, end)
1610                .build(),
1611        );
1612        let manifest_ctx = env
1613            .mock_manifest_context(version_control.current().version.metadata.clone())
1614            .await;
1615
1616        let file_metas: Vec<_> = version_control.current().version.ssts.levels()[0]
1617            .files
1618            .values()
1619            .map(|file| file.meta_ref().clone())
1620            .collect();
1621
1622        // 5 files for next compaction and removes old files.
1623        apply_edit(
1624            &version_control,
1625            &[(0, end), (20, end), (40, end), (60, end), (80, end)],
1626            &file_metas,
1627            purger.clone(),
1628        );
1629
1630        scheduler
1631            .schedule_compaction(
1632                region_id,
1633                compact_request::Options::Regular(Default::default()),
1634                &version_control,
1635                &env.access_layer,
1636                OptionOutputTx::none(),
1637                &manifest_ctx,
1638                schema_metadata_manager.clone(),
1639                1,
1640            )
1641            .await
1642            .unwrap();
1643        // Should schedule 1 compaction.
1644        assert_eq!(1, scheduler.region_status.len());
1645        assert_eq!(1, job_scheduler.num_jobs());
1646        assert!(
1647            scheduler
1648                .region_status
1649                .get(&region_id)
1650                .unwrap()
1651                .pending_request
1652                .is_none()
1653        );
1654
1655        // Schedule another manual compaction.
1656        let (tx, _rx) = oneshot::channel();
1657        scheduler
1658            .schedule_compaction(
1659                region_id,
1660                compact_request::Options::StrictWindow(StrictWindow { window_seconds: 60 }),
1661                &version_control,
1662                &env.access_layer,
1663                OptionOutputTx::new(Some(OutputTx::new(tx))),
1664                &manifest_ctx,
1665                schema_metadata_manager.clone(),
1666                1,
1667            )
1668            .await
1669            .unwrap();
1670        assert_eq!(1, scheduler.region_status.len());
1671        // Current job num should be 1 since compaction is in progress.
1672        assert_eq!(1, job_scheduler.num_jobs());
1673        let status = scheduler.region_status.get(&builder.region_id()).unwrap();
1674        assert!(status.pending_request.is_some());
1675
1676        // On compaction finished and schedule next compaction.
1677        scheduler
1678            .on_compaction_finished(region_id, &manifest_ctx, schema_metadata_manager.clone())
1679            .await;
1680        assert_eq!(1, scheduler.region_status.len());
1681        assert_eq!(2, job_scheduler.num_jobs());
1682
1683        let status = scheduler.region_status.get(&builder.region_id()).unwrap();
1684        assert!(status.pending_request.is_none());
1685    }
1686
1687    #[tokio::test]
1688    async fn test_compaction_bypass_in_staging_mode() {
1689        let env = SchedulerEnv::new().await;
1690        let (tx, _rx) = mpsc::channel(4);
1691        let mut scheduler = env.mock_compaction_scheduler(tx);
1692
1693        // Create version control and manifest context for staging mode
1694        let builder = VersionControlBuilder::new();
1695        let version_control = Arc::new(builder.build());
1696        let region_id = version_control.current().version.metadata.region_id;
1697
1698        // Create staging manifest context using the same pattern as SchedulerEnv
1699        let staging_manifest_ctx = {
1700            let manager = RegionManifestManager::new(
1701                version_control.current().version.metadata.clone(),
1702                0,
1703                RegionManifestOptions {
1704                    manifest_dir: "".to_string(),
1705                    object_store: env.access_layer.object_store().clone(),
1706                    compress_type: CompressionType::Uncompressed,
1707                    checkpoint_distance: 10,
1708                    remove_file_options: Default::default(),
1709                    manifest_cache: None,
1710                },
1711                FormatType::PrimaryKey,
1712                &Default::default(),
1713            )
1714            .await
1715            .unwrap();
1716            Arc::new(ManifestContext::new(
1717                manager,
1718                RegionRoleState::Leader(RegionLeaderState::Staging),
1719            ))
1720        };
1721
1722        let (schema_metadata_manager, _kv_backend) = mock_schema_metadata_manager();
1723
1724        // Test regular compaction bypass in staging mode
1725        let (tx, rx) = oneshot::channel();
1726        scheduler
1727            .schedule_compaction(
1728                region_id,
1729                compact_request::Options::Regular(Default::default()),
1730                &version_control,
1731                &env.access_layer,
1732                OptionOutputTx::new(Some(OutputTx::new(tx))),
1733                &staging_manifest_ctx,
1734                schema_metadata_manager,
1735                1,
1736            )
1737            .await
1738            .unwrap();
1739
1740        let result = rx.await.unwrap();
1741        assert_eq!(result.unwrap(), 0); // is there a better way to check this?
1742        assert_eq!(0, scheduler.region_status.len());
1743    }
1744
1745    #[tokio::test]
1746    async fn test_add_ddl_request_to_pending() {
1747        let env = SchedulerEnv::new().await;
1748        let (tx, _rx) = mpsc::channel(4);
1749        let mut scheduler = env.mock_compaction_scheduler(tx);
1750        let builder = VersionControlBuilder::new();
1751        let version_control = Arc::new(builder.build());
1752        let region_id = builder.region_id();
1753
1754        scheduler.region_status.insert(
1755            region_id,
1756            CompactionStatus::new(region_id, version_control, env.access_layer.clone()),
1757        );
1758        scheduler
1759            .region_status
1760            .get_mut(&region_id)
1761            .unwrap()
1762            .start_local_task();
1763
1764        let (output_tx, _output_rx) = oneshot::channel();
1765        scheduler.add_ddl_request_to_pending(SenderDdlRequest {
1766            region_id,
1767            sender: OptionOutputTx::from(output_tx),
1768            request: crate::request::DdlRequest::EnterStaging(
1769                store_api::region_request::EnterStagingRequest {
1770                    partition_directive:
1771                        store_api::region_request::StagingPartitionDirective::RejectAllWrites,
1772                },
1773            ),
1774        });
1775
1776        assert!(scheduler.has_pending_ddls(region_id));
1777    }
1778
1779    #[tokio::test]
1780    async fn test_request_cancel_state_transitions() {
1781        let env = SchedulerEnv::new().await;
1782        let builder = VersionControlBuilder::new();
1783        let region_id = builder.region_id();
1784        let version_control = Arc::new(builder.build());
1785        let mut status =
1786            CompactionStatus::new(region_id, version_control, env.access_layer.clone());
1787        let state = status.start_local_task();
1788
1789        assert_eq!(status.request_cancel(), RequestCancelResult::CancelIssued);
1790        assert!(state.cancel_handle().is_cancelled());
1791        assert_eq!(
1792            status.request_cancel(),
1793            RequestCancelResult::AlreadyCancelling
1794        );
1795
1796        assert!(!state.mark_commit_started());
1797        assert_eq!(
1798            status.request_cancel(),
1799            RequestCancelResult::AlreadyCancelling
1800        );
1801
1802        assert!(status.clear_running_task());
1803        assert_eq!(status.request_cancel(), RequestCancelResult::NotRunning);
1804    }
1805
1806    #[tokio::test]
1807    async fn test_request_cancel_remote_compaction_is_too_late() {
1808        let env = SchedulerEnv::new().await;
1809        let builder = VersionControlBuilder::new();
1810        let region_id = builder.region_id();
1811        let version_control = Arc::new(builder.build());
1812        let mut status =
1813            CompactionStatus::new(region_id, version_control, env.access_layer.clone());
1814
1815        status.start_remote_task();
1816
1817        assert_eq!(
1818            status.request_cancel(),
1819            RequestCancelResult::TooLateToCancel
1820        );
1821        assert!(status.active_compaction.is_some());
1822    }
1823
1824    #[tokio::test]
1825    async fn test_on_compaction_cancelled_returns_pending_ddl_requests() {
1826        let job_scheduler = Arc::new(VecScheduler::default());
1827        let env = SchedulerEnv::new().await.scheduler(job_scheduler.clone());
1828        let (tx, _rx) = mpsc::channel(4);
1829        let mut scheduler = env.mock_compaction_scheduler(tx);
1830        let builder = VersionControlBuilder::new();
1831        let version_control = Arc::new(builder.build());
1832        let region_id = builder.region_id();
1833        let _manifest_ctx = env
1834            .mock_manifest_context(version_control.current().version.metadata.clone())
1835            .await;
1836        let (_schema_metadata_manager, _kv_backend) = mock_schema_metadata_manager();
1837
1838        scheduler.region_status.insert(
1839            region_id,
1840            CompactionStatus::new(region_id, version_control, env.access_layer.clone()),
1841        );
1842        scheduler
1843            .region_status
1844            .get_mut(&region_id)
1845            .unwrap()
1846            .start_local_task();
1847
1848        let (output_tx, _output_rx) = oneshot::channel();
1849        scheduler.add_ddl_request_to_pending(SenderDdlRequest {
1850            region_id,
1851            sender: OptionOutputTx::from(output_tx),
1852            request: crate::request::DdlRequest::EnterStaging(
1853                store_api::region_request::EnterStagingRequest {
1854                    partition_directive:
1855                        store_api::region_request::StagingPartitionDirective::RejectAllWrites,
1856                },
1857            ),
1858        });
1859
1860        let pending_ddls = scheduler.on_compaction_cancelled(region_id).await;
1861
1862        assert_eq!(pending_ddls.len(), 1);
1863        assert!(!scheduler.has_pending_ddls(region_id));
1864        assert!(!scheduler.region_status.contains_key(&region_id));
1865        assert_eq!(job_scheduler.num_jobs(), 0);
1866    }
1867
1868    #[tokio::test]
1869    async fn test_on_compaction_cancelled_prioritizes_pending_ddls_over_pending_compaction() {
1870        let job_scheduler = Arc::new(VecScheduler::default());
1871        let env = SchedulerEnv::new().await.scheduler(job_scheduler.clone());
1872        let (tx, _rx) = mpsc::channel(4);
1873        let mut scheduler = env.mock_compaction_scheduler(tx);
1874        let builder = VersionControlBuilder::new();
1875        let version_control = Arc::new(builder.build());
1876        let region_id = builder.region_id();
1877        let _manifest_ctx = env
1878            .mock_manifest_context(version_control.current().version.metadata.clone())
1879            .await;
1880        let (_schema_metadata_manager, _kv_backend) = mock_schema_metadata_manager();
1881
1882        scheduler.region_status.insert(
1883            region_id,
1884            CompactionStatus::new(region_id, version_control, env.access_layer.clone()),
1885        );
1886        let status = scheduler.region_status.get_mut(&region_id).unwrap();
1887        status.start_local_task();
1888        let (manual_tx, manual_rx) = oneshot::channel();
1889        status.set_pending_request(PendingCompaction {
1890            options: compact_request::Options::StrictWindow(StrictWindow { window_seconds: 60 }),
1891            waiter: OptionOutputTx::from(manual_tx),
1892            max_parallelism: 1,
1893        });
1894
1895        let (output_tx, _output_rx) = oneshot::channel();
1896        scheduler.add_ddl_request_to_pending(SenderDdlRequest {
1897            region_id,
1898            sender: OptionOutputTx::from(output_tx),
1899            request: crate::request::DdlRequest::EnterStaging(
1900                store_api::region_request::EnterStagingRequest {
1901                    partition_directive:
1902                        store_api::region_request::StagingPartitionDirective::RejectAllWrites,
1903                },
1904            ),
1905        });
1906
1907        let pending_ddls = scheduler.on_compaction_cancelled(region_id).await;
1908
1909        assert_eq!(pending_ddls.len(), 1);
1910        assert!(!scheduler.region_status.contains_key(&region_id));
1911        assert_eq!(job_scheduler.num_jobs(), 0);
1912        assert_matches!(manual_rx.await.unwrap(), Err(_));
1913    }
1914
1915    #[tokio::test]
1916    async fn test_pending_ddl_request_failed_on_compaction_failed() {
1917        let env = SchedulerEnv::new().await;
1918        let (tx, _rx) = mpsc::channel(4);
1919        let mut scheduler = env.mock_compaction_scheduler(tx);
1920        let builder = VersionControlBuilder::new();
1921        let version_control = Arc::new(builder.build());
1922        let region_id = builder.region_id();
1923
1924        scheduler.region_status.insert(
1925            region_id,
1926            CompactionStatus::new(region_id, version_control, env.access_layer.clone()),
1927        );
1928
1929        let (output_tx, output_rx) = oneshot::channel();
1930        scheduler.add_ddl_request_to_pending(SenderDdlRequest {
1931            region_id,
1932            sender: OptionOutputTx::from(output_tx),
1933            request: crate::request::DdlRequest::EnterStaging(
1934                store_api::region_request::EnterStagingRequest {
1935                    partition_directive:
1936                        store_api::region_request::StagingPartitionDirective::RejectAllWrites,
1937                },
1938            ),
1939        });
1940
1941        assert!(scheduler.has_pending_ddls(region_id));
1942        scheduler
1943            .on_compaction_failed(region_id, Arc::new(RegionClosedSnafu { region_id }.build()));
1944
1945        assert!(!scheduler.has_pending_ddls(region_id));
1946        let result = output_rx.await.unwrap();
1947        assert_matches!(result, Err(_));
1948    }
1949
1950    #[tokio::test]
1951    async fn test_pending_ddl_request_failed_on_region_closed() {
1952        let env = SchedulerEnv::new().await;
1953        let (tx, _rx) = mpsc::channel(4);
1954        let mut scheduler = env.mock_compaction_scheduler(tx);
1955        let builder = VersionControlBuilder::new();
1956        let version_control = Arc::new(builder.build());
1957        let region_id = builder.region_id();
1958
1959        scheduler.region_status.insert(
1960            region_id,
1961            CompactionStatus::new(region_id, version_control, env.access_layer.clone()),
1962        );
1963
1964        let (output_tx, output_rx) = oneshot::channel();
1965        scheduler.add_ddl_request_to_pending(SenderDdlRequest {
1966            region_id,
1967            sender: OptionOutputTx::from(output_tx),
1968            request: crate::request::DdlRequest::EnterStaging(
1969                store_api::region_request::EnterStagingRequest {
1970                    partition_directive:
1971                        store_api::region_request::StagingPartitionDirective::RejectAllWrites,
1972                },
1973            ),
1974        });
1975
1976        assert!(scheduler.has_pending_ddls(region_id));
1977        scheduler.on_region_closed(region_id);
1978
1979        assert!(!scheduler.has_pending_ddls(region_id));
1980        let result = output_rx.await.unwrap();
1981        assert_matches!(result, Err(_));
1982    }
1983
1984    #[tokio::test]
1985    async fn test_pending_ddl_request_failed_on_region_dropped() {
1986        let env = SchedulerEnv::new().await;
1987        let (tx, _rx) = mpsc::channel(4);
1988        let mut scheduler = env.mock_compaction_scheduler(tx);
1989        let builder = VersionControlBuilder::new();
1990        let version_control = Arc::new(builder.build());
1991        let region_id = builder.region_id();
1992
1993        scheduler.region_status.insert(
1994            region_id,
1995            CompactionStatus::new(region_id, version_control, env.access_layer.clone()),
1996        );
1997
1998        let (output_tx, output_rx) = oneshot::channel();
1999        scheduler.add_ddl_request_to_pending(SenderDdlRequest {
2000            region_id,
2001            sender: OptionOutputTx::from(output_tx),
2002            request: crate::request::DdlRequest::EnterStaging(
2003                store_api::region_request::EnterStagingRequest {
2004                    partition_directive:
2005                        store_api::region_request::StagingPartitionDirective::RejectAllWrites,
2006                },
2007            ),
2008        });
2009
2010        assert!(scheduler.has_pending_ddls(region_id));
2011        scheduler.on_region_dropped(region_id);
2012
2013        assert!(!scheduler.has_pending_ddls(region_id));
2014        let result = output_rx.await.unwrap();
2015        assert_matches!(result, Err(_));
2016    }
2017
2018    #[tokio::test]
2019    async fn test_pending_ddl_request_failed_on_region_truncated() {
2020        let env = SchedulerEnv::new().await;
2021        let (tx, _rx) = mpsc::channel(4);
2022        let mut scheduler = env.mock_compaction_scheduler(tx);
2023        let builder = VersionControlBuilder::new();
2024        let version_control = Arc::new(builder.build());
2025        let region_id = builder.region_id();
2026
2027        scheduler.region_status.insert(
2028            region_id,
2029            CompactionStatus::new(region_id, version_control, env.access_layer.clone()),
2030        );
2031
2032        let (output_tx, output_rx) = oneshot::channel();
2033        scheduler.add_ddl_request_to_pending(SenderDdlRequest {
2034            region_id,
2035            sender: OptionOutputTx::from(output_tx),
2036            request: crate::request::DdlRequest::EnterStaging(
2037                store_api::region_request::EnterStagingRequest {
2038                    partition_directive:
2039                        store_api::region_request::StagingPartitionDirective::RejectAllWrites,
2040                },
2041            ),
2042        });
2043
2044        assert!(scheduler.has_pending_ddls(region_id));
2045        scheduler.on_region_truncated(region_id);
2046
2047        assert!(!scheduler.has_pending_ddls(region_id));
2048        let result = output_rx.await.unwrap();
2049        assert_matches!(result, Err(_));
2050    }
2051
2052    #[tokio::test]
2053    async fn test_on_compaction_finished_returns_pending_ddl_requests() {
2054        let job_scheduler = Arc::new(VecScheduler::default());
2055        let env = SchedulerEnv::new().await.scheduler(job_scheduler.clone());
2056        let (tx, _rx) = mpsc::channel(4);
2057        let mut scheduler = env.mock_compaction_scheduler(tx);
2058        let builder = VersionControlBuilder::new();
2059        let version_control = Arc::new(builder.build());
2060        let region_id = builder.region_id();
2061        let manifest_ctx = env
2062            .mock_manifest_context(version_control.current().version.metadata.clone())
2063            .await;
2064        let (schema_metadata_manager, _kv_backend) = mock_schema_metadata_manager();
2065
2066        scheduler.region_status.insert(
2067            region_id,
2068            CompactionStatus::new(region_id, version_control, env.access_layer.clone()),
2069        );
2070        scheduler
2071            .region_status
2072            .get_mut(&region_id)
2073            .unwrap()
2074            .start_local_task();
2075
2076        let (output_tx, _output_rx) = oneshot::channel();
2077        scheduler.add_ddl_request_to_pending(SenderDdlRequest {
2078            region_id,
2079            sender: OptionOutputTx::from(output_tx),
2080            request: crate::request::DdlRequest::EnterStaging(
2081                store_api::region_request::EnterStagingRequest {
2082                    partition_directive:
2083                        store_api::region_request::StagingPartitionDirective::RejectAllWrites,
2084                },
2085            ),
2086        });
2087
2088        let pending_ddls = scheduler
2089            .on_compaction_finished(region_id, &manifest_ctx, schema_metadata_manager)
2090            .await;
2091
2092        assert_eq!(pending_ddls.len(), 1);
2093        assert!(!scheduler.has_pending_ddls(region_id));
2094        assert!(!scheduler.region_status.contains_key(&region_id));
2095        assert_eq!(job_scheduler.num_jobs(), 0);
2096    }
2097
2098    #[tokio::test]
2099    async fn test_on_compaction_finished_replays_pending_ddl_after_manual_noop() {
2100        let env = SchedulerEnv::new().await;
2101        let (tx, _rx) = mpsc::channel(4);
2102        let mut scheduler = env.mock_compaction_scheduler(tx);
2103        let builder = VersionControlBuilder::new();
2104        let version_control = Arc::new(builder.build());
2105        let region_id = builder.region_id();
2106        let manifest_ctx = env
2107            .mock_manifest_context(version_control.current().version.metadata.clone())
2108            .await;
2109        let (schema_metadata_manager, _kv_backend) = mock_schema_metadata_manager();
2110
2111        let (manual_tx, manual_rx) = oneshot::channel();
2112        let mut status =
2113            CompactionStatus::new(region_id, version_control.clone(), env.access_layer.clone());
2114        status.start_local_task();
2115        status.set_pending_request(PendingCompaction {
2116            options: compact_request::Options::Regular(Default::default()),
2117            waiter: OptionOutputTx::from(manual_tx),
2118            max_parallelism: 1,
2119        });
2120        scheduler.region_status.insert(region_id, status);
2121
2122        let (ddl_tx, _ddl_rx) = oneshot::channel();
2123        scheduler.add_ddl_request_to_pending(SenderDdlRequest {
2124            region_id,
2125            sender: OptionOutputTx::from(ddl_tx),
2126            request: crate::request::DdlRequest::EnterStaging(
2127                store_api::region_request::EnterStagingRequest {
2128                    partition_directive:
2129                        store_api::region_request::StagingPartitionDirective::RejectAllWrites,
2130                },
2131            ),
2132        });
2133
2134        let pending_ddls = scheduler
2135            .on_compaction_finished(region_id, &manifest_ctx, schema_metadata_manager)
2136            .await;
2137
2138        assert_eq!(pending_ddls.len(), 1);
2139        assert!(!scheduler.region_status.contains_key(&region_id));
2140        assert_eq!(manual_rx.await.unwrap().unwrap(), 0);
2141    }
2142
2143    #[tokio::test]
2144    async fn test_on_compaction_finished_returns_empty_when_region_absent() {
2145        let env = SchedulerEnv::new().await;
2146        let (tx, _rx) = mpsc::channel(4);
2147        let mut scheduler = env.mock_compaction_scheduler(tx);
2148        let builder = VersionControlBuilder::new();
2149        let region_id = builder.region_id();
2150        let version_control = Arc::new(builder.build());
2151        let manifest_ctx = env
2152            .mock_manifest_context(version_control.current().version.metadata.clone())
2153            .await;
2154        let (schema_metadata_manager, _kv_backend) = mock_schema_metadata_manager();
2155
2156        let pending_ddls = scheduler
2157            .on_compaction_finished(region_id, &manifest_ctx, schema_metadata_manager)
2158            .await;
2159
2160        assert!(pending_ddls.is_empty());
2161    }
2162
2163    #[tokio::test]
2164    async fn test_on_compaction_finished_manual_schedule_error_cleans_status() {
2165        let env = SchedulerEnv::new()
2166            .await
2167            .scheduler(Arc::new(FailingScheduler));
2168        let (tx, _rx) = mpsc::channel(4);
2169        let mut scheduler = env.mock_compaction_scheduler(tx);
2170        let mut builder = VersionControlBuilder::new();
2171        let end = 1000 * 1000;
2172        let version_control = Arc::new(
2173            builder
2174                .push_l0_file(0, end)
2175                .push_l0_file(10, end)
2176                .push_l0_file(50, end)
2177                .push_l0_file(80, end)
2178                .push_l0_file(90, end)
2179                .build(),
2180        );
2181        let region_id = builder.region_id();
2182        let manifest_ctx = env
2183            .mock_manifest_context(version_control.current().version.metadata.clone())
2184            .await;
2185        let (schema_metadata_manager, _kv_backend) = mock_schema_metadata_manager();
2186
2187        let (manual_tx, manual_rx) = oneshot::channel();
2188        let mut status =
2189            CompactionStatus::new(region_id, version_control.clone(), env.access_layer.clone());
2190        status.start_local_task();
2191        status.set_pending_request(PendingCompaction {
2192            options: compact_request::Options::Regular(Default::default()),
2193            waiter: OptionOutputTx::from(manual_tx),
2194            max_parallelism: 1,
2195        });
2196        scheduler.region_status.insert(region_id, status);
2197
2198        let (ddl_tx, ddl_rx) = oneshot::channel();
2199        scheduler.add_ddl_request_to_pending(SenderDdlRequest {
2200            region_id,
2201            sender: OptionOutputTx::from(ddl_tx),
2202            request: crate::request::DdlRequest::EnterStaging(
2203                store_api::region_request::EnterStagingRequest {
2204                    partition_directive:
2205                        store_api::region_request::StagingPartitionDirective::RejectAllWrites,
2206                },
2207            ),
2208        });
2209
2210        let pending_ddls = scheduler
2211            .on_compaction_finished(region_id, &manifest_ctx, schema_metadata_manager)
2212            .await;
2213
2214        assert!(pending_ddls.is_empty());
2215        assert!(!scheduler.region_status.contains_key(&region_id));
2216        assert!(manual_rx.await.is_err());
2217        assert_matches!(ddl_rx.await.unwrap(), Err(_));
2218    }
2219
2220    #[tokio::test]
2221    async fn test_on_compaction_finished_next_schedule_noop_removes_status() {
2222        let env = SchedulerEnv::new().await;
2223        let (tx, _rx) = mpsc::channel(4);
2224        let mut scheduler = env.mock_compaction_scheduler(tx);
2225        let builder = VersionControlBuilder::new();
2226        let version_control = Arc::new(builder.build());
2227        let region_id = builder.region_id();
2228        let manifest_ctx = env
2229            .mock_manifest_context(version_control.current().version.metadata.clone())
2230            .await;
2231        let (schema_metadata_manager, _kv_backend) = mock_schema_metadata_manager();
2232
2233        scheduler.region_status.insert(
2234            region_id,
2235            CompactionStatus::new(region_id, version_control, env.access_layer.clone()),
2236        );
2237        scheduler
2238            .region_status
2239            .get_mut(&region_id)
2240            .unwrap()
2241            .start_local_task();
2242
2243        let pending_ddls = scheduler
2244            .on_compaction_finished(region_id, &manifest_ctx, schema_metadata_manager)
2245            .await;
2246
2247        assert!(pending_ddls.is_empty());
2248        assert!(!scheduler.region_status.contains_key(&region_id));
2249    }
2250
2251    #[tokio::test]
2252    async fn test_on_compaction_finished_next_schedule_error_cleans_status() {
2253        let env = SchedulerEnv::new()
2254            .await
2255            .scheduler(Arc::new(FailingScheduler));
2256        let (tx, _rx) = mpsc::channel(4);
2257        let mut scheduler = env.mock_compaction_scheduler(tx);
2258        let mut builder = VersionControlBuilder::new();
2259        let end = 1000 * 1000;
2260        let version_control = Arc::new(
2261            builder
2262                .push_l0_file(0, end)
2263                .push_l0_file(10, end)
2264                .push_l0_file(50, end)
2265                .push_l0_file(80, end)
2266                .push_l0_file(90, end)
2267                .build(),
2268        );
2269        let region_id = builder.region_id();
2270        let manifest_ctx = env
2271            .mock_manifest_context(version_control.current().version.metadata.clone())
2272            .await;
2273        let (schema_metadata_manager, _kv_backend) = mock_schema_metadata_manager();
2274
2275        scheduler.region_status.insert(
2276            region_id,
2277            CompactionStatus::new(region_id, version_control, env.access_layer.clone()),
2278        );
2279        scheduler
2280            .region_status
2281            .get_mut(&region_id)
2282            .unwrap()
2283            .start_local_task();
2284
2285        let pending_ddls = scheduler
2286            .on_compaction_finished(region_id, &manifest_ctx, schema_metadata_manager)
2287            .await;
2288
2289        assert!(pending_ddls.is_empty());
2290        assert!(!scheduler.region_status.contains_key(&region_id));
2291    }
2292
2293    #[tokio::test]
2294    async fn test_concurrent_memory_competition() {
2295        let manager = Arc::new(new_compaction_memory_manager(3 * 1024 * 1024)); // 3MB
2296        let barrier = Arc::new(Barrier::new(3));
2297        let mut handles = vec![];
2298
2299        // Spawn 3 tasks competing for memory, each trying to acquire 2MB
2300        for _i in 0..3 {
2301            let mgr = manager.clone();
2302            let bar = barrier.clone();
2303            let handle = tokio::spawn(async move {
2304                bar.wait().await; // Synchronize start
2305                mgr.try_acquire(2 * 1024 * 1024)
2306            });
2307            handles.push(handle);
2308        }
2309
2310        let results: Vec<Option<CompactionMemoryGuard>> = futures::future::join_all(handles)
2311            .await
2312            .into_iter()
2313            .map(|r| r.unwrap())
2314            .collect();
2315
2316        // Only 1 should succeed (3MB limit, 2MB request, can only fit one)
2317        let succeeded = results.iter().filter(|r| r.is_some()).count();
2318        let failed = results.iter().filter(|r| r.is_none()).count();
2319
2320        assert_eq!(succeeded, 1, "Expected exactly 1 task to acquire memory");
2321        assert_eq!(failed, 2, "Expected 2 tasks to fail");
2322
2323        // Clean up
2324        drop(results);
2325        assert_eq!(manager.used_bytes(), 0);
2326    }
2327}