1mod buckets;
16pub mod compactor;
17pub mod memory_manager;
18pub mod picker;
19pub mod run;
20mod task;
21#[cfg(test)]
22mod test_util;
23mod twcs;
24mod window;
25
26use std::collections::HashMap;
27use std::sync::{Arc, Mutex};
28use std::time::Instant;
29
30use api::v1::region::compact_request;
31use api::v1::region::compact_request::Options;
32use arrow_schema::Schema;
33use common_base::Plugins;
34use common_base::cancellation::CancellationHandle;
35use common_memory_manager::OnExhaustedPolicy;
36use common_meta::key::SchemaMetadataManagerRef;
37use common_telemetry::{debug, error, info, warn};
38use common_time::range::TimestampRange;
39use common_time::timestamp::TimeUnit;
40use common_time::{TimeToLive, Timestamp};
41use datafusion_common::ScalarValue;
42use datafusion_expr::Expr;
43use datatypes::extension::json::is_json_extension_type;
44use datatypes::schema::ext::ArrowSchemaExt;
45use datatypes::types::json_type::JsonNativeType;
46use parquet::arrow::parquet_to_arrow_schema;
47use parquet::file::metadata::PageIndexPolicy;
48use serde::{Deserialize, Serialize};
49use snafu::{OptionExt, ResultExt};
50use store_api::metadata::RegionMetadataRef;
51use store_api::storage::RegionId;
52use task::MAX_PARALLEL_COMPACTION;
53use tokio::sync::mpsc::{self, Sender};
54
55use crate::access_layer::AccessLayerRef;
56use crate::cache::{CacheManagerRef, CacheStrategy};
57use crate::compaction::compactor::{CompactionRegion, CompactionVersion, DefaultCompactor};
58use crate::compaction::memory_manager::CompactionMemoryManager;
59use crate::compaction::picker::{CompactionTask, PickerOutput, new_picker};
60use crate::compaction::task::CompactionTaskImpl;
61use crate::config::MitoConfig;
62use crate::error::{
63 CompactRegionSnafu, CompactionCancelledSnafu, DataTypeMismatchSnafu, Error,
64 GetSchemaMetadataSnafu, ManualCompactionOverrideSnafu, ParquetToArrowSchemaSnafu,
65 RegionClosedSnafu, RegionDroppedSnafu, RegionTruncatedSnafu, RemoteCompactionSnafu, Result,
66 TimeRangePredicateOverflowSnafu, TimeoutSnafu,
67};
68use crate::metrics::{COMPACTION_STAGE_ELAPSED, INFLIGHT_COMPACTION_COUNT};
69use crate::read::FlatSource;
70use crate::read::flat_projection::FlatProjectionMapper;
71use crate::read::read_columns::ReadColumns;
72use crate::read::scan_region::{PredicateGroup, ScanInput};
73use crate::read::seq_scan::SeqScan;
74use crate::region::options::{MergeMode, RegionOptions};
75use crate::region::version::VersionControlRef;
76use crate::region::{ManifestContextRef, RegionLeaderState, RegionRoleState};
77use crate::request::{OptionOutputTx, OutputTx, SenderDdlRequest, WorkerRequestWithTime};
78use crate::schedule::remote_job_scheduler::{
79 CompactionJob, DefaultNotifier, RemoteJob, RemoteJobSchedulerRef,
80};
81use crate::schedule::scheduler::SchedulerRef;
82use crate::sst::file::{FileHandle, FileMeta, Level};
83use crate::sst::parquet::reader::MetadataCacheMetrics;
84use crate::sst::version::LevelMeta;
85use crate::worker::WorkerListener;
86
87pub struct CompactionRequest {
89 pub(crate) engine_config: Arc<MitoConfig>,
90 pub(crate) current_version: CompactionVersion,
91 pub(crate) access_layer: AccessLayerRef,
92 pub(crate) request_sender: mpsc::Sender<WorkerRequestWithTime>,
94 pub(crate) waiters: Vec<OutputTx>,
96 pub(crate) start_time: Instant,
98 pub(crate) cache_manager: CacheManagerRef,
99 pub(crate) manifest_ctx: ManifestContextRef,
100 pub(crate) listener: WorkerListener,
101 pub(crate) schema_metadata_manager: SchemaMetadataManagerRef,
102 pub(crate) max_parallelism: usize,
103}
104
105impl CompactionRequest {
106 pub(crate) fn region_id(&self) -> RegionId {
107 self.current_version.metadata.region_id
108 }
109}
110
111pub(crate) struct CompactionScheduler {
113 scheduler: SchedulerRef,
114 region_status: HashMap<RegionId, CompactionStatus>,
116 request_sender: Sender<WorkerRequestWithTime>,
118 cache_manager: CacheManagerRef,
119 engine_config: Arc<MitoConfig>,
120 memory_manager: Arc<CompactionMemoryManager>,
121 memory_policy: OnExhaustedPolicy,
122 listener: WorkerListener,
123 plugins: Plugins,
125}
126
127impl CompactionScheduler {
128 #[allow(clippy::too_many_arguments)]
129 pub(crate) fn new(
130 scheduler: SchedulerRef,
131 request_sender: Sender<WorkerRequestWithTime>,
132 cache_manager: CacheManagerRef,
133 engine_config: Arc<MitoConfig>,
134 listener: WorkerListener,
135 plugins: Plugins,
136 memory_manager: Arc<CompactionMemoryManager>,
137 memory_policy: OnExhaustedPolicy,
138 ) -> Self {
139 Self {
140 scheduler,
141 region_status: HashMap::new(),
142 request_sender,
143 cache_manager,
144 engine_config,
145 memory_manager,
146 memory_policy,
147 listener,
148 plugins,
149 }
150 }
151
152 #[allow(clippy::too_many_arguments)]
155 pub(crate) async fn schedule_compaction(
156 &mut self,
157 region_id: RegionId,
158 compact_options: compact_request::Options,
159 version_control: &VersionControlRef,
160 access_layer: &AccessLayerRef,
161 waiter: OptionOutputTx,
162 manifest_ctx: &ManifestContextRef,
163 schema_metadata_manager: SchemaMetadataManagerRef,
164 max_parallelism: usize,
165 ) -> Result<bool> {
166 let current_state = manifest_ctx.current_state();
168 if current_state == RegionRoleState::Leader(RegionLeaderState::Staging) {
169 info!(
170 "Skipping compaction for region {} in staging mode, options: {:?}",
171 region_id, compact_options
172 );
173 waiter.send(Ok(0));
174 return Ok(false);
175 }
176
177 if let Some(status) = self.region_status.get_mut(®ion_id) {
178 match compact_options {
179 Options::Regular(_) => {
180 status.merge_waiter(waiter);
182 }
183 options @ Options::StrictWindow(_) => {
184 status.set_pending_request(PendingCompaction {
186 options,
187 waiter,
188 max_parallelism,
189 });
190 info!(
191 "Region {} is compacting, manually compaction will be re-scheduled.",
192 region_id
193 );
194 }
195 }
196 return Ok(false);
197 }
198
199 let mut status =
201 CompactionStatus::new(region_id, version_control.clone(), access_layer.clone());
202 let request = status.new_compaction_request(
203 self.request_sender.clone(),
204 waiter,
205 self.engine_config.clone(),
206 self.cache_manager.clone(),
207 manifest_ctx,
208 self.listener.clone(),
209 schema_metadata_manager,
210 max_parallelism,
211 );
212
213 match self
214 .schedule_compaction_request(request, compact_options)
215 .await
216 {
217 Ok(Some(active_compaction)) => {
218 status.active_compaction = Some(active_compaction);
222 self.region_status.insert(region_id, status);
223
224 self.listener.on_compaction_scheduled(region_id);
225 Ok(true)
226 }
227 Ok(None) => Ok(false),
228 Err(e) => Err(e),
229 }
230 }
231
232 pub(crate) async fn handle_pending_compaction_request(
236 &mut self,
237 region_id: RegionId,
238 manifest_ctx: &ManifestContextRef,
239 schema_metadata_manager: SchemaMetadataManagerRef,
240 ) -> bool {
241 let Some(status) = self.region_status.get_mut(®ion_id) else {
242 return true;
243 };
244
245 let Some(pending_request) = std::mem::take(&mut status.pending_request) else {
248 return false;
249 };
250
251 let PendingCompaction {
252 options,
253 waiter,
254 max_parallelism,
255 } = pending_request;
256
257 let request = {
258 status.new_compaction_request(
259 self.request_sender.clone(),
260 waiter,
261 self.engine_config.clone(),
262 self.cache_manager.clone(),
263 manifest_ctx,
264 self.listener.clone(),
265 schema_metadata_manager,
266 max_parallelism,
267 )
268 };
269
270 match self.schedule_compaction_request(request, options).await {
271 Ok(Some(active_compaction)) => {
272 let status = self.region_status.get_mut(®ion_id).unwrap();
273 status.active_compaction = Some(active_compaction);
274 debug!(
275 "Successfully scheduled manual compaction for region id: {}",
276 region_id
277 );
278 true
279 }
280 Ok(None) => {
281 false
284 }
285 Err(e) => {
286 error!(e; "Failed to continue pending manual compaction for region id: {}", region_id);
287 self.remove_region_on_failure(region_id, Arc::new(e));
288 true
289 }
290 }
291 }
292
293 pub(crate) async fn on_compaction_finished(
295 &mut self,
296 region_id: RegionId,
297 manifest_ctx: &ManifestContextRef,
298 schema_metadata_manager: SchemaMetadataManagerRef,
299 ) -> Vec<SenderDdlRequest> {
300 let Some(status) = self.region_status.get_mut(®ion_id) else {
301 return Vec::new();
302 };
303 status.clear_running_task();
304
305 if self
308 .handle_pending_compaction_request(
309 region_id,
310 manifest_ctx,
311 schema_metadata_manager.clone(),
312 )
313 .await
314 {
315 return Vec::new();
316 }
317
318 let Some(status) = self.region_status.get_mut(®ion_id) else {
319 return Vec::new();
322 };
323
324 for waiter in std::mem::take(&mut status.waiters) {
325 waiter.send(Ok(0));
326 }
327
328 let pending_ddl_requests = std::mem::take(&mut status.pending_ddl_requests);
330 if !pending_ddl_requests.is_empty() {
331 self.region_status.remove(®ion_id);
332 return pending_ddl_requests;
335 }
336 Vec::new()
337 }
338
339 pub(crate) fn is_compacting(&self, region_id: RegionId) -> bool {
340 self.region_status
341 .get(®ion_id)
342 .map(|status| status.active_compaction.is_some())
343 .unwrap_or(false)
344 }
345
346 pub(crate) async fn schedule_next_compaction(
349 &mut self,
350 region_id: RegionId,
351 manifest_ctx: &ManifestContextRef,
352 schema_metadata_manager: SchemaMetadataManagerRef,
353 ) -> bool {
354 let Some(status) = self.region_status.get_mut(®ion_id) else {
355 return false;
356 };
357
358 let request = status.new_compaction_request(
360 self.request_sender.clone(),
361 OptionOutputTx::none(),
362 self.engine_config.clone(),
363 self.cache_manager.clone(),
364 manifest_ctx,
365 self.listener.clone(),
366 schema_metadata_manager,
367 MAX_PARALLEL_COMPACTION,
368 );
369
370 match self
372 .schedule_compaction_request(
373 request,
374 compact_request::Options::Regular(Default::default()),
375 )
376 .await
377 {
378 Ok(Some(active_compaction)) => {
379 self.region_status
380 .get_mut(®ion_id)
381 .unwrap()
382 .active_compaction = Some(active_compaction);
383 debug!(
384 "Successfully scheduled next compaction for region id: {}",
385 region_id
386 );
387 true
388 }
389 Ok(None) => {
390 self.region_status.remove(®ion_id);
394 false
395 }
396 Err(e) => {
397 error!(e; "Failed to schedule next compaction for region {}", region_id);
398 self.remove_region_on_failure(region_id, Arc::new(e));
399 false
400 }
401 }
402 }
403
404 pub(crate) async fn on_compaction_cancelled(
406 &mut self,
407 region_id: RegionId,
408 ) -> Vec<SenderDdlRequest> {
409 self.remove_region_on_cancel(region_id)
410 }
411
412 pub(crate) fn on_compaction_failed(&mut self, region_id: RegionId, err: Arc<Error>) {
414 error!(err; "Region {} failed to compact, cancel all pending tasks", region_id);
415 self.remove_region_on_failure(region_id, err);
416 }
417
418 pub(crate) fn on_region_dropped(&mut self, region_id: RegionId) {
420 self.remove_region_on_failure(
421 region_id,
422 Arc::new(RegionDroppedSnafu { region_id }.build()),
423 );
424 }
425
426 pub(crate) fn on_region_closed(&mut self, region_id: RegionId) {
428 self.remove_region_on_failure(region_id, Arc::new(RegionClosedSnafu { region_id }.build()));
429 }
430
431 pub(crate) fn on_region_truncated(&mut self, region_id: RegionId) {
433 self.remove_region_on_failure(
434 region_id,
435 Arc::new(RegionTruncatedSnafu { region_id }.build()),
436 );
437 }
438
439 pub(crate) fn add_ddl_request_to_pending(&mut self, request: SenderDdlRequest) {
444 debug!(
445 "Added pending DDL request for region: {}, ddl: {:?}",
446 request.region_id, request.request
447 );
448 let status = self.region_status.get_mut(&request.region_id).unwrap();
449 status.pending_ddl_requests.push(request);
450 }
451
452 #[cfg(test)]
453 pub(crate) fn has_pending_ddls(&self, region_id: RegionId) -> bool {
454 let has_pending = self
455 .region_status
456 .get(®ion_id)
457 .map(|status| !status.pending_ddl_requests.is_empty())
458 .unwrap_or(false);
459 debug!(
460 "Checked pending DDL requests for region: {}, has_pending: {}",
461 region_id, has_pending
462 );
463 has_pending
464 }
465
466 pub(crate) fn request_cancel(&mut self, region_id: RegionId) -> RequestCancelResult {
467 let Some(status) = self.region_status.get_mut(®ion_id) else {
468 return RequestCancelResult::NotRunning;
469 };
470
471 status.request_cancel()
472 }
473
474 async fn schedule_compaction_request(
479 &mut self,
480 request: CompactionRequest,
481 options: compact_request::Options,
482 ) -> Result<Option<ActiveCompaction>> {
483 let region_id = request.region_id();
484 let (dynamic_compaction_opts, ttl) = find_dynamic_options(
485 region_id,
486 &request.current_version.options,
487 &request.schema_metadata_manager,
488 )
489 .await
490 .unwrap_or_else(|e| {
491 warn!(e; "Failed to find dynamic options for region: {}", region_id);
492 (
493 request.current_version.options.compaction.clone(),
494 request.current_version.options.ttl.unwrap_or_default(),
495 )
496 });
497
498 let picker = new_picker(
499 &options,
500 &dynamic_compaction_opts,
501 request.current_version.options.append_mode,
502 Some(self.engine_config.max_background_compactions),
503 );
504 let region_id = request.region_id();
505 let CompactionRequest {
506 engine_config,
507 current_version,
508 access_layer,
509 request_sender,
510 waiters,
511 start_time,
512 cache_manager,
513 manifest_ctx,
514 listener,
515 schema_metadata_manager: _,
516 max_parallelism,
517 } = request;
518
519 debug!(
520 "Pick compaction strategy {:?} for region: {}, ttl: {:?}",
521 picker, region_id, ttl
522 );
523
524 let compaction_region = CompactionRegion {
525 region_id,
526 current_version: current_version.clone(),
527 region_options: RegionOptions {
528 compaction: dynamic_compaction_opts.clone(),
529 ..current_version.options.clone()
530 },
531 engine_config: engine_config.clone(),
532 region_metadata: current_version.metadata.clone(),
533 cache_manager: cache_manager.clone(),
534 access_layer: access_layer.clone(),
535 manifest_ctx: manifest_ctx.clone(),
536 file_purger: None,
537 ttl: Some(ttl),
538 max_parallelism,
539 };
540
541 let picker_output = {
542 let _pick_timer = COMPACTION_STAGE_ELAPSED
543 .with_label_values(&["pick"])
544 .start_timer();
545 picker.pick(&compaction_region)
546 };
547
548 let picker_output = if let Some(picker_output) = picker_output {
549 picker_output
550 } else {
551 for waiter in waiters {
553 waiter.send(Ok(0));
554 }
555 return Ok(None);
556 };
557
558 let waiters = if dynamic_compaction_opts.remote_compaction() {
561 if let Some(remote_job_scheduler) = &self.plugins.get::<RemoteJobSchedulerRef>() {
562 let remote_compaction_job = CompactionJob {
563 compaction_region: compaction_region.clone(),
564 picker_output: picker_output.clone(),
565 start_time,
566 waiters,
567 ttl,
568 };
569
570 let result = remote_job_scheduler
571 .schedule(
572 RemoteJob::CompactionJob(remote_compaction_job),
573 Box::new(DefaultNotifier {
574 request_sender: request_sender.clone(),
575 }),
576 )
577 .await;
578
579 match result {
580 Ok(job_id) => {
581 info!(
582 "Scheduled remote compaction job {} for region {}",
583 job_id, region_id
584 );
585 INFLIGHT_COMPACTION_COUNT.inc();
586 return Ok(Some(ActiveCompaction::Remote));
587 }
588 Err(e) => {
589 if !dynamic_compaction_opts.fallback_to_local() {
590 error!(e; "Failed to schedule remote compaction job for region {}", region_id);
591 return RemoteCompactionSnafu {
592 region_id,
593 job_id: None,
594 reason: e.reason,
595 }
596 .fail();
597 }
598
599 error!(e; "Failed to schedule remote compaction job for region {}, fallback to local compaction", region_id);
600
601 e.waiters
603 }
604 }
605 } else {
606 debug!(
607 "Remote compaction is not enabled, fallback to local compaction for region {}",
608 region_id
609 );
610 waiters
611 }
612 } else {
613 waiters
614 };
615
616 let estimated_bytes = estimate_compaction_bytes(&picker_output);
618
619 let cancel_handle = Arc::new(CancellationHandle::default());
620 let state = LocalCompactionState::new(cancel_handle.clone());
621 let local_compaction_task = Box::new(CompactionTaskImpl {
622 state: state.clone(),
623 request_sender,
624 waiters,
625 start_time,
626 listener,
627 picker_output,
628 compaction_region,
629 compactor: Arc::new(DefaultCompactor::with_cancel_handle(cancel_handle.clone())),
630 memory_manager: self.memory_manager.clone(),
631 memory_policy: self.memory_policy,
632 estimated_memory_bytes: estimated_bytes,
633 });
634
635 self.submit_compaction_task(local_compaction_task, region_id)
636 .map(|_| Some(ActiveCompaction::Local { state }))
637 }
638
639 fn submit_compaction_task(
640 &mut self,
641 mut task: Box<CompactionTaskImpl>,
642 region_id: RegionId,
643 ) -> Result<()> {
644 self.scheduler
645 .schedule(Box::pin(async move {
646 INFLIGHT_COMPACTION_COUNT.inc();
647 task.run().await;
648 INFLIGHT_COMPACTION_COUNT.dec();
649 }))
650 .inspect_err(
651 |e| error!(e; "Failed to submit compaction request for region {}", region_id),
652 )
653 }
654
655 fn remove_region_on_failure(&mut self, region_id: RegionId, err: Arc<Error>) {
656 let Some(status) = self.region_status.remove(®ion_id) else {
658 return;
659 };
660
661 status.on_failure(err);
663 }
664
665 fn remove_region_on_cancel(&mut self, region_id: RegionId) -> Vec<SenderDdlRequest> {
666 let Some(status) = self.region_status.remove(®ion_id) else {
667 return Vec::new();
668 };
669
670 status.on_cancel()
671 }
672}
673
674#[derive(Debug, Clone)]
675pub(crate) struct LocalCompactionState {
676 cancel_handle: Arc<CancellationHandle>,
677 commit_started: Arc<Mutex<bool>>,
678}
679
680#[derive(Debug)]
681enum ActiveCompaction {
682 Local { state: LocalCompactionState },
683 Remote,
684}
685
686impl LocalCompactionState {
687 fn new(cancel_handle: Arc<CancellationHandle>) -> Self {
688 Self {
689 cancel_handle,
690 commit_started: Arc::new(Mutex::new(false)),
691 }
692 }
693
694 pub(crate) fn cancel_handle(&self) -> Arc<CancellationHandle> {
696 self.cancel_handle.clone()
697 }
698
699 pub(crate) fn mark_commit_started(&self) -> bool {
705 let mut commit_started = self.commit_started.lock().unwrap();
706 if self.cancel_handle.is_cancelled() {
707 return false;
708 }
709 *commit_started = true;
710 true
711 }
712
713 pub(crate) fn request_cancel(&self) -> RequestCancelResult {
715 let commit_started = self.commit_started.lock().unwrap();
717 if *commit_started {
718 return RequestCancelResult::TooLateToCancel;
719 }
720 if self.cancel_handle.is_cancelled() {
721 return RequestCancelResult::AlreadyCancelling;
722 }
723
724 self.cancel_handle.cancel();
725 RequestCancelResult::CancelIssued
726 }
727}
728
729#[derive(Debug, Clone, Copy, PartialEq, Eq)]
730pub(crate) enum RequestCancelResult {
731 CancelIssued,
732 AlreadyCancelling,
733 TooLateToCancel,
734 NotRunning,
735}
736
737impl Drop for CompactionScheduler {
738 fn drop(&mut self) {
739 for (region_id, status) in self.region_status.drain() {
740 status.on_failure(Arc::new(RegionClosedSnafu { region_id }.build()));
742 }
743 }
744}
745
746async fn find_dynamic_options(
748 region_id: RegionId,
749 region_options: &crate::region::options::RegionOptions,
750 schema_metadata_manager: &SchemaMetadataManagerRef,
751) -> Result<(crate::region::options::CompactionOptions, TimeToLive)> {
752 let table_id = region_id.table_id();
753 if let (true, Some(ttl)) = (region_options.compaction_override, region_options.ttl) {
754 debug!(
755 "Use region options directly for table {}: compaction={:?}, ttl={:?}",
756 table_id, region_options.compaction, region_options.ttl
757 );
758 return Ok((region_options.compaction.clone(), ttl));
759 }
760
761 let db_options = tokio::time::timeout(
762 crate::config::FETCH_OPTION_TIMEOUT,
763 schema_metadata_manager.get_schema_options_by_table_id(table_id),
764 )
765 .await
766 .context(TimeoutSnafu)?
767 .context(GetSchemaMetadataSnafu)?;
768
769 let ttl = if let Some(ttl) = region_options.ttl {
770 debug!(
771 "Use region TTL directly for table {}: ttl={:?}",
772 table_id, region_options.ttl
773 );
774 ttl
775 } else {
776 db_options
777 .as_ref()
778 .and_then(|options| options.ttl)
779 .unwrap_or_default()
780 .into()
781 };
782
783 let compaction = if !region_options.compaction_override {
784 if let Some(schema_opts) = db_options {
785 let map: HashMap<String, String> = schema_opts
786 .extra_options
787 .iter()
788 .filter_map(|(k, v)| {
789 if k.starts_with("compaction.") {
790 Some((k.clone(), v.clone()))
791 } else {
792 None
793 }
794 })
795 .collect();
796 if map.is_empty() {
797 region_options.compaction.clone()
798 } else {
799 crate::region::options::RegionOptions::try_from_options(region_id, &map)
800 .map(|o| o.compaction)
801 .unwrap_or_else(|e| {
802 error!(e; "Failed to create RegionOptions from map");
803 region_options.compaction.clone()
804 })
805 }
806 } else {
807 debug!(
808 "DB options is None for table {}, use region compaction: compaction={:?}",
809 table_id, region_options.compaction
810 );
811 region_options.compaction.clone()
812 }
813 } else {
814 debug!(
815 "No schema options for table {}, use region compaction: compaction={:?}",
816 table_id, region_options.compaction
817 );
818 region_options.compaction.clone()
819 };
820
821 debug!(
822 "Resolved dynamic options for table {}: compaction={:?}, ttl={:?}",
823 table_id, compaction, ttl
824 );
825 Ok((compaction, ttl))
826}
827
828struct CompactionStatus {
830 region_id: RegionId,
832 version_control: VersionControlRef,
834 access_layer: AccessLayerRef,
836 waiters: Vec<OutputTx>,
838 pending_request: Option<PendingCompaction>,
840 pending_ddl_requests: Vec<SenderDdlRequest>,
842 active_compaction: Option<ActiveCompaction>,
844}
845
846impl CompactionStatus {
847 fn new(
849 region_id: RegionId,
850 version_control: VersionControlRef,
851 access_layer: AccessLayerRef,
852 ) -> CompactionStatus {
853 CompactionStatus {
854 region_id,
855 version_control,
856 access_layer,
857 waiters: Vec::new(),
858 pending_request: None,
859 pending_ddl_requests: Vec::new(),
860 active_compaction: None,
861 }
862 }
863
864 #[cfg(test)]
865 fn start_local_task(&mut self) -> LocalCompactionState {
866 let state = LocalCompactionState::new(Arc::new(CancellationHandle::default()));
867 self.active_compaction = Some(ActiveCompaction::Local {
868 state: state.clone(),
869 });
870 state
871 }
872
873 #[cfg(test)]
874 fn start_remote_task(&mut self) {
875 self.active_compaction = Some(ActiveCompaction::Remote);
876 }
877
878 fn request_cancel(&mut self) -> RequestCancelResult {
879 let Some(active_compaction) = &self.active_compaction else {
880 return RequestCancelResult::NotRunning;
881 };
882
883 match active_compaction {
884 ActiveCompaction::Local { state, .. } => state.request_cancel(),
885 ActiveCompaction::Remote => RequestCancelResult::TooLateToCancel,
886 }
887 }
888
889 fn clear_running_task(&mut self) -> bool {
890 self.active_compaction.take().is_some()
891 }
892
893 fn merge_waiter(&mut self, mut waiter: OptionOutputTx) {
895 if let Some(waiter) = waiter.take_inner() {
896 self.waiters.push(waiter);
897 }
898 }
899
900 fn set_pending_request(&mut self, pending: PendingCompaction) {
902 if let Some(prev) = self.pending_request.replace(pending) {
903 debug!(
904 "Replace pending compaction options with new request {:?} for region: {}",
905 prev.options, self.region_id
906 );
907 prev.waiter.send(ManualCompactionOverrideSnafu.fail());
908 }
909 }
910
911 fn on_failure(mut self, err: Arc<Error>) {
912 for waiter in self.waiters.drain(..) {
913 waiter.send(Err(err.clone()).context(CompactRegionSnafu {
914 region_id: self.region_id,
915 }));
916 }
917
918 if let Some(pending_compaction) = self.pending_request {
919 pending_compaction
920 .waiter
921 .send(Err(err.clone()).context(CompactRegionSnafu {
922 region_id: self.region_id,
923 }));
924 }
925
926 for pending_ddl in self.pending_ddl_requests {
927 pending_ddl
928 .sender
929 .send(Err(err.clone()).context(CompactRegionSnafu {
930 region_id: self.region_id,
931 }));
932 }
933 }
934
935 #[must_use]
936 fn on_cancel(mut self) -> Vec<SenderDdlRequest> {
937 for waiter in self.waiters.drain(..) {
938 waiter.send(CompactionCancelledSnafu.fail());
939 }
940
941 if let Some(pending_compaction) = self.pending_request {
942 pending_compaction.waiter.send(
943 Err(Arc::new(CompactionCancelledSnafu.build())).context(CompactRegionSnafu {
944 region_id: self.region_id,
945 }),
946 );
947 }
948
949 std::mem::take(&mut self.pending_ddl_requests)
950 }
951
952 #[allow(clippy::too_many_arguments)]
956 fn new_compaction_request(
957 &mut self,
958 request_sender: Sender<WorkerRequestWithTime>,
959 mut waiter: OptionOutputTx,
960 engine_config: Arc<MitoConfig>,
961 cache_manager: CacheManagerRef,
962 manifest_ctx: &ManifestContextRef,
963 listener: WorkerListener,
964 schema_metadata_manager: SchemaMetadataManagerRef,
965 max_parallelism: usize,
966 ) -> CompactionRequest {
967 let current_version = CompactionVersion::from(self.version_control.current().version);
968 let start_time = Instant::now();
969 let mut waiters = Vec::with_capacity(self.waiters.len() + 1);
970 waiters.extend(std::mem::take(&mut self.waiters));
971
972 if let Some(waiter) = waiter.take_inner() {
973 waiters.push(waiter);
974 }
975
976 CompactionRequest {
977 engine_config,
978 current_version,
979 access_layer: self.access_layer.clone(),
980 request_sender: request_sender.clone(),
981 waiters,
982 start_time,
983 cache_manager,
984 manifest_ctx: manifest_ctx.clone(),
985 listener,
986 schema_metadata_manager,
987 max_parallelism,
988 }
989 }
990}
991
992#[derive(Debug, Clone)]
993pub struct CompactionOutput {
994 pub output_level: Level,
996 pub inputs: Vec<FileHandle>,
998 pub filter_deleted: bool,
1000 pub output_time_range: Option<TimestampRange>,
1002}
1003
1004#[derive(Debug, Clone, Serialize, Deserialize)]
1006pub struct SerializedCompactionOutput {
1007 output_level: Level,
1008 inputs: Vec<FileMeta>,
1009 filter_deleted: bool,
1010 output_time_range: Option<TimestampRange>,
1011}
1012
1013struct CompactionSstReaderBuilder<'a> {
1015 metadata: RegionMetadataRef,
1016 sst_layer: AccessLayerRef,
1017 cache: CacheManagerRef,
1018 inputs: &'a [FileHandle],
1019 append_mode: bool,
1020 filter_deleted: bool,
1021 time_range: Option<TimestampRange>,
1022 merge_mode: MergeMode,
1023}
1024
1025impl CompactionSstReaderBuilder<'_> {
1026 async fn build_flat_sst_reader(self) -> Result<FlatSource> {
1029 let scan_input = self.build_scan_input().await?.with_compaction(true);
1030
1031 let schema = scan_input.mapper.output_schema();
1032 let schema = schema.arrow_schema();
1033
1034 let stream = SeqScan::new(scan_input)
1035 .build_flat_reader_for_compaction()
1036 .await?;
1037 Ok(FlatSource::new_stream(schema.clone(), stream))
1038 }
1039
1040 async fn build_scan_input(self) -> Result<ScanInput> {
1041 let schema = self.metadata.schema.arrow_schema();
1042 let json_type_hint = if schema.has_json_extension_field() {
1043 let mut json_type_hint = schema
1044 .fields()
1045 .iter()
1046 .filter(|&field| is_json_extension_type(field))
1047 .map(|field| (field.name().clone(), JsonNativeType::Null))
1048 .collect::<HashMap<_, _>>();
1049
1050 let schemas = self.collect_arrow_schemas_from_parquet().await?;
1051 for schema in schemas {
1052 for field in schema.fields() {
1053 let Some(merged) = json_type_hint.get_mut(field.name()) else {
1054 continue;
1055 };
1056
1057 let json_type = JsonNativeType::try_from(field.data_type())
1058 .context(DataTypeMismatchSnafu)?;
1059 merged.merge(&json_type);
1060 }
1061 }
1062
1063 Some(json_type_hint)
1064 } else {
1065 None
1066 };
1067
1068 let projection = (0..self.metadata.column_metadatas.len()).collect();
1069 let read_columns = ReadColumns::from_deduped_column_ids(
1070 self.metadata.column_metadatas.iter().map(|x| x.column_id),
1071 );
1072 let mapper = FlatProjectionMapper::new_with_read_columns(
1073 &self.metadata,
1074 projection,
1075 read_columns,
1076 json_type_hint.as_ref(),
1077 )?;
1078
1079 let mut scan_input = ScanInput::new(self.sst_layer, mapper)
1080 .with_files(self.inputs.to_vec())
1081 .with_append_mode(self.append_mode)
1082 .with_cache(CacheStrategy::Compaction(self.cache))
1084 .with_filter_deleted(self.filter_deleted)
1085 .with_ignore_file_not_found(true)
1087 .with_merge_mode(self.merge_mode);
1088
1089 if let Some(time_range) = self.time_range {
1092 scan_input =
1093 scan_input.with_predicate(time_range_to_predicate(time_range, &self.metadata)?);
1094 }
1095
1096 Ok(scan_input)
1097 }
1098
1099 async fn collect_arrow_schemas_from_parquet(&self) -> Result<Vec<Schema>> {
1100 let mut schemas = Vec::with_capacity(self.inputs.len());
1101
1102 for file_handle in self.inputs {
1103 let file_path =
1104 file_handle.file_path(self.sst_layer.table_dir(), self.sst_layer.path_type());
1105 let file_size = file_handle.meta_ref().file_size;
1106 let parquet_metadata = match self
1107 .sst_layer
1108 .read_sst(file_handle.clone())
1109 .cache(CacheStrategy::Compaction(self.cache.clone()))
1110 .read_parquet_metadata(
1111 &file_path,
1112 file_size,
1113 &mut MetadataCacheMetrics::default(),
1114 PageIndexPolicy::default(),
1115 )
1116 .await
1117 .map(|x| x.0.parquet_metadata())
1118 {
1119 Ok(x) => x,
1120 Err(e) if e.is_object_not_found() => continue,
1121 Err(e) => return Err(e),
1122 };
1123 let file_metadata = parquet_metadata.file_metadata();
1124
1125 let schema = parquet_to_arrow_schema(
1126 file_metadata.schema_descr(),
1127 file_metadata.key_value_metadata(),
1128 )
1129 .context(ParquetToArrowSchemaSnafu { file: file_path })?;
1130
1131 schemas.push(schema);
1132 }
1133 Ok(schemas)
1134 }
1135}
1136
1137fn time_range_to_predicate(
1139 range: TimestampRange,
1140 metadata: &RegionMetadataRef,
1141) -> Result<PredicateGroup> {
1142 let ts_col = metadata.time_index_column();
1143
1144 let ts_col_unit = ts_col
1146 .column_schema
1147 .data_type
1148 .as_timestamp()
1149 .unwrap()
1150 .unit();
1151
1152 let exprs = match (range.start(), range.end()) {
1153 (Some(start), Some(end)) => {
1154 vec![
1155 datafusion_expr::col(ts_col.column_schema.name.clone())
1156 .gt_eq(ts_to_lit(*start, ts_col_unit)?),
1157 datafusion_expr::col(ts_col.column_schema.name.clone())
1158 .lt(ts_to_lit(*end, ts_col_unit)?),
1159 ]
1160 }
1161 (Some(start), None) => {
1162 vec![
1163 datafusion_expr::col(ts_col.column_schema.name.clone())
1164 .gt_eq(ts_to_lit(*start, ts_col_unit)?),
1165 ]
1166 }
1167
1168 (None, Some(end)) => {
1169 vec![
1170 datafusion_expr::col(ts_col.column_schema.name.clone())
1171 .lt(ts_to_lit(*end, ts_col_unit)?),
1172 ]
1173 }
1174 (None, None) => {
1175 return Ok(PredicateGroup::default());
1176 }
1177 };
1178
1179 let predicate = PredicateGroup::new(metadata, &exprs)?;
1180 Ok(predicate)
1181}
1182
1183fn ts_to_lit(ts: Timestamp, ts_col_unit: TimeUnit) -> Result<Expr> {
1184 let ts = ts
1185 .convert_to(ts_col_unit)
1186 .context(TimeRangePredicateOverflowSnafu {
1187 timestamp: ts,
1188 unit: ts_col_unit,
1189 })?;
1190 let val = ts.value();
1191 let scalar_value = match ts_col_unit {
1192 TimeUnit::Second => ScalarValue::TimestampSecond(Some(val), None),
1193 TimeUnit::Millisecond => ScalarValue::TimestampMillisecond(Some(val), None),
1194 TimeUnit::Microsecond => ScalarValue::TimestampMicrosecond(Some(val), None),
1195 TimeUnit::Nanosecond => ScalarValue::TimestampNanosecond(Some(val), None),
1196 };
1197 Ok(datafusion_expr::lit(scalar_value))
1198}
1199
1200fn get_expired_ssts(
1202 levels: &[LevelMeta],
1203 ttl: Option<TimeToLive>,
1204 now: Timestamp,
1205) -> Vec<FileHandle> {
1206 let Some(ttl) = ttl else {
1207 return vec![];
1208 };
1209
1210 levels
1211 .iter()
1212 .flat_map(|l| l.get_expired_files(&now, &ttl).into_iter())
1213 .collect()
1214}
1215
1216fn estimate_compaction_bytes(picker_output: &PickerOutput) -> u64 {
1219 picker_output
1220 .outputs
1221 .iter()
1222 .flat_map(|output| output.inputs.iter())
1223 .map(|file: &FileHandle| {
1224 let meta = file.meta_ref();
1225 meta.max_row_group_uncompressed_size
1226 })
1227 .sum()
1228}
1229
1230struct PendingCompaction {
1233 pub(crate) options: compact_request::Options,
1235 pub(crate) waiter: OptionOutputTx,
1237 pub(crate) max_parallelism: usize,
1239}
1240
1241#[cfg(test)]
1242mod tests {
1243 use std::assert_matches;
1244 use std::time::Duration;
1245
1246 use api::v1::region::StrictWindow;
1247 use common_datasource::compression::CompressionType;
1248 use common_meta::key::schema_name::SchemaNameValue;
1249 use common_time::DatabaseTimeToLive;
1250 use tokio::sync::{Barrier, oneshot};
1251
1252 use super::*;
1253 use crate::compaction::memory_manager::{CompactionMemoryGuard, new_compaction_memory_manager};
1254 use crate::error::InvalidSchedulerStateSnafu;
1255 use crate::manifest::manager::{RegionManifestManager, RegionManifestOptions};
1256 use crate::region::ManifestContext;
1257 use crate::schedule::scheduler::{Job, Scheduler};
1258 use crate::sst::FormatType;
1259 use crate::test_util::mock_schema_metadata_manager;
1260 use crate::test_util::scheduler_util::{SchedulerEnv, VecScheduler};
1261 use crate::test_util::version_util::{VersionControlBuilder, apply_edit};
1262
1263 struct FailingScheduler;
1264
1265 #[async_trait::async_trait]
1266 impl Scheduler for FailingScheduler {
1267 fn schedule(&self, _job: Job) -> Result<()> {
1268 InvalidSchedulerStateSnafu.fail()
1269 }
1270
1271 async fn stop(&self, _await_termination: bool) -> Result<()> {
1272 Ok(())
1273 }
1274 }
1275
1276 #[tokio::test]
1277 async fn test_find_compaction_options_db_level() {
1278 let env = SchedulerEnv::new().await;
1279 let builder = VersionControlBuilder::new();
1280 let (schema_metadata_manager, kv_backend) = mock_schema_metadata_manager();
1281 let region_id = builder.region_id();
1282 let table_id = region_id.table_id();
1283 let mut schema_value = SchemaNameValue {
1285 ttl: Some(DatabaseTimeToLive::default()),
1286 ..Default::default()
1287 };
1288 schema_value
1289 .extra_options
1290 .insert("compaction.type".to_string(), "twcs".to_string());
1291 schema_value
1292 .extra_options
1293 .insert("compaction.twcs.time_window".to_string(), "2h".to_string());
1294 schema_metadata_manager
1295 .register_region_table_info(
1296 table_id,
1297 "t",
1298 "c",
1299 "s",
1300 Some(schema_value),
1301 kv_backend.clone(),
1302 )
1303 .await;
1304
1305 let version_control = Arc::new(builder.build());
1306 let region_opts = version_control.current().version.options.clone();
1307 let (opts, _) = find_dynamic_options(region_id, ®ion_opts, &schema_metadata_manager)
1308 .await
1309 .unwrap();
1310 match opts {
1311 crate::region::options::CompactionOptions::Twcs(t) => {
1312 assert_eq!(t.time_window_seconds(), Some(2 * 3600));
1313 }
1314 }
1315 let manifest_ctx = env
1316 .mock_manifest_context(version_control.current().version.metadata.clone())
1317 .await;
1318 let (tx, _rx) = mpsc::channel(4);
1319 let mut scheduler = env.mock_compaction_scheduler(tx);
1320 let (otx, _orx) = oneshot::channel();
1321 let request = scheduler
1322 .region_status
1323 .entry(region_id)
1324 .or_insert_with(|| {
1325 crate::compaction::CompactionStatus::new(
1326 region_id,
1327 version_control.clone(),
1328 env.access_layer.clone(),
1329 )
1330 })
1331 .new_compaction_request(
1332 scheduler.request_sender.clone(),
1333 OptionOutputTx::new(Some(OutputTx::new(otx))),
1334 scheduler.engine_config.clone(),
1335 scheduler.cache_manager.clone(),
1336 &manifest_ctx,
1337 scheduler.listener.clone(),
1338 schema_metadata_manager.clone(),
1339 1,
1340 );
1341 scheduler
1342 .schedule_compaction_request(
1343 request,
1344 compact_request::Options::Regular(Default::default()),
1345 )
1346 .await
1347 .unwrap();
1348 }
1349
1350 #[tokio::test]
1351 async fn test_find_compaction_options_priority() {
1352 fn schema_value_with_twcs(time_window: &str) -> SchemaNameValue {
1353 let mut schema_value = SchemaNameValue {
1354 ttl: Some(DatabaseTimeToLive::default()),
1355 ..Default::default()
1356 };
1357 schema_value
1358 .extra_options
1359 .insert("compaction.type".to_string(), "twcs".to_string());
1360 schema_value.extra_options.insert(
1361 "compaction.twcs.time_window".to_string(),
1362 time_window.to_string(),
1363 );
1364 schema_value
1365 }
1366
1367 let cases = [
1368 (
1369 "db options set and table override set",
1370 Some(schema_value_with_twcs("2h")),
1371 true,
1372 Some(Duration::from_secs(5 * 3600)),
1373 Some(5 * 3600),
1374 ),
1375 (
1376 "db options set and table override not set",
1377 Some(schema_value_with_twcs("2h")),
1378 false,
1379 None,
1380 Some(2 * 3600),
1381 ),
1382 (
1383 "db options not set and table override set",
1384 None,
1385 true,
1386 Some(Duration::from_secs(4 * 3600)),
1387 Some(4 * 3600),
1388 ),
1389 (
1390 "db options not set and table override not set",
1391 None,
1392 false,
1393 None,
1394 None,
1395 ),
1396 ];
1397
1398 for (case_name, schema_value, override_set, table_window, expected_window) in cases {
1399 let builder = VersionControlBuilder::new();
1400 let (schema_metadata_manager, kv_backend) = mock_schema_metadata_manager();
1401 let region_id = builder.region_id();
1402 let table_id = region_id.table_id();
1403 schema_metadata_manager
1404 .register_region_table_info(
1405 table_id,
1406 "t",
1407 "c",
1408 "s",
1409 schema_value,
1410 kv_backend.clone(),
1411 )
1412 .await;
1413
1414 let version_control = Arc::new(builder.build());
1415 let mut region_opts = version_control.current().version.options.clone();
1416 region_opts.compaction_override = override_set;
1417 if let Some(window) = table_window {
1418 let crate::region::options::CompactionOptions::Twcs(twcs) =
1419 &mut region_opts.compaction;
1420 twcs.time_window = Some(window);
1421 }
1422
1423 let (opts, _) = find_dynamic_options(region_id, ®ion_opts, &schema_metadata_manager)
1424 .await
1425 .unwrap();
1426 match opts {
1427 crate::region::options::CompactionOptions::Twcs(t) => {
1428 assert_eq!(t.time_window_seconds(), expected_window, "{case_name}");
1429 }
1430 }
1431 }
1432 }
1433
1434 #[tokio::test]
1435 async fn test_schedule_empty() {
1436 let env = SchedulerEnv::new().await;
1437 let (tx, _rx) = mpsc::channel(4);
1438 let mut scheduler = env.mock_compaction_scheduler(tx);
1439 let mut builder = VersionControlBuilder::new();
1440 let (schema_metadata_manager, kv_backend) = mock_schema_metadata_manager();
1441 schema_metadata_manager
1442 .register_region_table_info(
1443 builder.region_id().table_id(),
1444 "test_table",
1445 "test_catalog",
1446 "test_schema",
1447 None,
1448 kv_backend,
1449 )
1450 .await;
1451 let version_control = Arc::new(builder.build());
1453 let (output_tx, output_rx) = oneshot::channel();
1454 let waiter = OptionOutputTx::from(output_tx);
1455 let manifest_ctx = env
1456 .mock_manifest_context(version_control.current().version.metadata.clone())
1457 .await;
1458 let scheduled = scheduler
1459 .schedule_compaction(
1460 builder.region_id(),
1461 compact_request::Options::Regular(Default::default()),
1462 &version_control,
1463 &env.access_layer,
1464 waiter,
1465 &manifest_ctx,
1466 schema_metadata_manager.clone(),
1467 1,
1468 )
1469 .await
1470 .unwrap();
1471 assert!(!scheduled);
1472 let output = output_rx.await.unwrap().unwrap();
1473 assert_eq!(output, 0);
1474 assert!(scheduler.region_status.is_empty());
1475
1476 let version_control = Arc::new(builder.push_l0_file(0, 1000).build());
1478 let (output_tx, output_rx) = oneshot::channel();
1479 let waiter = OptionOutputTx::from(output_tx);
1480 let scheduled = scheduler
1481 .schedule_compaction(
1482 builder.region_id(),
1483 compact_request::Options::Regular(Default::default()),
1484 &version_control,
1485 &env.access_layer,
1486 waiter,
1487 &manifest_ctx,
1488 schema_metadata_manager,
1489 1,
1490 )
1491 .await
1492 .unwrap();
1493 assert!(!scheduled);
1494 let output = output_rx.await.unwrap().unwrap();
1495 assert_eq!(output, 0);
1496 assert!(scheduler.region_status.is_empty());
1497 }
1498
1499 #[tokio::test]
1500 async fn test_schedule_compaction_returns_true_when_task_scheduled() {
1501 let job_scheduler = Arc::new(VecScheduler::default());
1502 let env = SchedulerEnv::new().await.scheduler(job_scheduler.clone());
1503 let (tx, _rx) = mpsc::channel(4);
1504 let mut scheduler = env.mock_compaction_scheduler(tx);
1505 let mut builder = VersionControlBuilder::new();
1506 let region_id = builder.region_id();
1507 let end = 1000 * 1000;
1508 let version_control = Arc::new(
1510 builder
1511 .push_l0_file(0, end)
1512 .push_l0_file(10, end)
1513 .push_l0_file(50, end)
1514 .push_l0_file(80, end)
1515 .push_l0_file(90, end)
1516 .build(),
1517 );
1518 let manifest_ctx = env
1519 .mock_manifest_context(version_control.current().version.metadata.clone())
1520 .await;
1521 let (schema_metadata_manager, kv_backend) = mock_schema_metadata_manager();
1522 schema_metadata_manager
1523 .register_region_table_info(
1524 region_id.table_id(),
1525 "test_table",
1526 "test_catalog",
1527 "test_schema",
1528 None,
1529 kv_backend,
1530 )
1531 .await;
1532
1533 let scheduled = scheduler
1534 .schedule_compaction(
1535 region_id,
1536 Options::Regular(Default::default()),
1537 &version_control,
1538 &env.access_layer,
1539 OptionOutputTx::none(),
1540 &manifest_ctx,
1541 schema_metadata_manager,
1542 1,
1543 )
1544 .await
1545 .unwrap();
1546
1547 assert!(scheduled);
1550 assert_eq!(1, job_scheduler.num_jobs());
1551 assert!(scheduler.region_status.contains_key(®ion_id));
1552 }
1553
1554 #[tokio::test]
1555 async fn test_schedule_on_finished() {
1556 common_telemetry::init_default_ut_logging();
1557 let job_scheduler = Arc::new(VecScheduler::default());
1558 let env = SchedulerEnv::new().await.scheduler(job_scheduler.clone());
1559 let (tx, _rx) = mpsc::channel(4);
1560 let mut scheduler = env.mock_compaction_scheduler(tx);
1561 let mut builder = VersionControlBuilder::new();
1562 let purger = builder.file_purger();
1563 let region_id = builder.region_id();
1564
1565 let (schema_metadata_manager, kv_backend) = mock_schema_metadata_manager();
1566 schema_metadata_manager
1567 .register_region_table_info(
1568 builder.region_id().table_id(),
1569 "test_table",
1570 "test_catalog",
1571 "test_schema",
1572 None,
1573 kv_backend,
1574 )
1575 .await;
1576
1577 let end = 1000 * 1000;
1579 let version_control = Arc::new(
1580 builder
1581 .push_l0_file(0, end)
1582 .push_l0_file(10, end)
1583 .push_l0_file(50, end)
1584 .push_l0_file(80, end)
1585 .push_l0_file(90, end)
1586 .build(),
1587 );
1588 let manifest_ctx = env
1589 .mock_manifest_context(version_control.current().version.metadata.clone())
1590 .await;
1591 let scheduled = scheduler
1592 .schedule_compaction(
1593 region_id,
1594 compact_request::Options::Regular(Default::default()),
1595 &version_control,
1596 &env.access_layer,
1597 OptionOutputTx::none(),
1598 &manifest_ctx,
1599 schema_metadata_manager.clone(),
1600 1,
1601 )
1602 .await
1603 .unwrap();
1604 assert!(scheduled);
1606 assert_eq!(1, scheduler.region_status.len());
1607 assert_eq!(1, job_scheduler.num_jobs());
1608 let data = version_control.current();
1609 let file_metas: Vec<_> = data.version.ssts.levels()[0]
1610 .files
1611 .values()
1612 .map(|file| file.meta_ref().clone())
1613 .collect();
1614
1615 apply_edit(
1617 &version_control,
1618 &[(0, end), (20, end), (40, end), (60, end), (80, end)],
1619 &file_metas,
1620 purger.clone(),
1621 );
1622 let (tx, _rx) = oneshot::channel();
1624 let scheduled = scheduler
1625 .schedule_compaction(
1626 region_id,
1627 compact_request::Options::Regular(Default::default()),
1628 &version_control,
1629 &env.access_layer,
1630 OptionOutputTx::new(Some(OutputTx::new(tx))),
1631 &manifest_ctx,
1632 schema_metadata_manager.clone(),
1633 1,
1634 )
1635 .await
1636 .unwrap();
1637 assert!(!scheduled);
1638 assert_eq!(1, scheduler.region_status.len());
1639 assert_eq!(1, job_scheduler.num_jobs());
1640 assert!(
1641 !scheduler
1642 .region_status
1643 .get(&builder.region_id())
1644 .unwrap()
1645 .waiters
1646 .is_empty()
1647 );
1648
1649 scheduler
1651 .on_compaction_finished(region_id, &manifest_ctx, schema_metadata_manager.clone())
1652 .await;
1653 let scheduled = scheduler
1654 .schedule_next_compaction(region_id, &manifest_ctx, schema_metadata_manager.clone())
1655 .await;
1656 assert!(scheduled);
1657 assert_eq!(1, scheduler.region_status.len());
1658 assert_eq!(2, job_scheduler.num_jobs());
1659
1660 apply_edit(
1662 &version_control,
1663 &[(0, end), (20, end), (40, end), (60, end), (80, end)],
1664 &[],
1665 purger.clone(),
1666 );
1667 let (tx, _rx) = oneshot::channel();
1668 let scheduled = scheduler
1670 .schedule_compaction(
1671 region_id,
1672 compact_request::Options::Regular(Default::default()),
1673 &version_control,
1674 &env.access_layer,
1675 OptionOutputTx::new(Some(OutputTx::new(tx))),
1676 &manifest_ctx,
1677 schema_metadata_manager,
1678 1,
1679 )
1680 .await
1681 .unwrap();
1682 assert!(!scheduled);
1683 assert_eq!(2, job_scheduler.num_jobs());
1684 assert!(
1685 !scheduler
1686 .region_status
1687 .get(&builder.region_id())
1688 .unwrap()
1689 .waiters
1690 .is_empty()
1691 );
1692 }
1693
1694 #[tokio::test]
1695 async fn test_schedule_compaction_does_not_publish_status_when_schedule_fails() {
1696 common_telemetry::init_default_ut_logging();
1697 let env = SchedulerEnv::new()
1698 .await
1699 .scheduler(Arc::new(FailingScheduler));
1700 let (tx, _rx) = mpsc::channel(4);
1701 let mut scheduler = env.mock_compaction_scheduler(tx);
1702 let mut builder = VersionControlBuilder::new();
1703 let end = 1000 * 1000;
1704 let version_control = Arc::new(
1705 builder
1706 .push_l0_file(0, end)
1707 .push_l0_file(10, end)
1708 .push_l0_file(50, end)
1709 .push_l0_file(80, end)
1710 .push_l0_file(90, end)
1711 .build(),
1712 );
1713 let region_id = builder.region_id();
1714 let manifest_ctx = env
1715 .mock_manifest_context(version_control.current().version.metadata.clone())
1716 .await;
1717 let (schema_metadata_manager, kv_backend) = mock_schema_metadata_manager();
1718 schema_metadata_manager
1719 .register_region_table_info(
1720 builder.region_id().table_id(),
1721 "test_table",
1722 "test_catalog",
1723 "test_schema",
1724 None,
1725 kv_backend,
1726 )
1727 .await;
1728
1729 let result = scheduler
1730 .schedule_compaction(
1731 region_id,
1732 compact_request::Options::Regular(Default::default()),
1733 &version_control,
1734 &env.access_layer,
1735 OptionOutputTx::none(),
1736 &manifest_ctx,
1737 schema_metadata_manager,
1738 1,
1739 )
1740 .await;
1741
1742 assert!(result.is_err());
1743 assert!(!scheduler.region_status.contains_key(®ion_id));
1744 }
1745
1746 #[tokio::test]
1747 async fn test_manual_compaction_when_compaction_in_progress() {
1748 common_telemetry::init_default_ut_logging();
1749 let job_scheduler = Arc::new(VecScheduler::default());
1750 let env = SchedulerEnv::new().await.scheduler(job_scheduler.clone());
1751 let (tx, _rx) = mpsc::channel(4);
1752 let mut scheduler = env.mock_compaction_scheduler(tx);
1753 let mut builder = VersionControlBuilder::new();
1754 let purger = builder.file_purger();
1755 let region_id = builder.region_id();
1756
1757 let (schema_metadata_manager, kv_backend) = mock_schema_metadata_manager();
1758 schema_metadata_manager
1759 .register_region_table_info(
1760 builder.region_id().table_id(),
1761 "test_table",
1762 "test_catalog",
1763 "test_schema",
1764 None,
1765 kv_backend,
1766 )
1767 .await;
1768
1769 let end = 1000 * 1000;
1771 let version_control = Arc::new(
1772 builder
1773 .push_l0_file(0, end)
1774 .push_l0_file(10, end)
1775 .push_l0_file(50, end)
1776 .push_l0_file(80, end)
1777 .push_l0_file(90, end)
1778 .build(),
1779 );
1780 let manifest_ctx = env
1781 .mock_manifest_context(version_control.current().version.metadata.clone())
1782 .await;
1783
1784 let file_metas: Vec<_> = version_control.current().version.ssts.levels()[0]
1785 .files
1786 .values()
1787 .map(|file| file.meta_ref().clone())
1788 .collect();
1789
1790 apply_edit(
1792 &version_control,
1793 &[(0, end), (20, end), (40, end), (60, end), (80, end)],
1794 &file_metas,
1795 purger.clone(),
1796 );
1797
1798 scheduler
1799 .schedule_compaction(
1800 region_id,
1801 compact_request::Options::Regular(Default::default()),
1802 &version_control,
1803 &env.access_layer,
1804 OptionOutputTx::none(),
1805 &manifest_ctx,
1806 schema_metadata_manager.clone(),
1807 1,
1808 )
1809 .await
1810 .unwrap();
1811 assert_eq!(1, scheduler.region_status.len());
1813 assert_eq!(1, job_scheduler.num_jobs());
1814 assert!(
1815 scheduler
1816 .region_status
1817 .get(®ion_id)
1818 .unwrap()
1819 .pending_request
1820 .is_none()
1821 );
1822
1823 let (tx, _rx) = oneshot::channel();
1825 scheduler
1826 .schedule_compaction(
1827 region_id,
1828 compact_request::Options::StrictWindow(StrictWindow { window_seconds: 60 }),
1829 &version_control,
1830 &env.access_layer,
1831 OptionOutputTx::new(Some(OutputTx::new(tx))),
1832 &manifest_ctx,
1833 schema_metadata_manager.clone(),
1834 1,
1835 )
1836 .await
1837 .unwrap();
1838 assert_eq!(1, scheduler.region_status.len());
1839 assert_eq!(1, job_scheduler.num_jobs());
1841 let status = scheduler.region_status.get(&builder.region_id()).unwrap();
1842 assert!(status.pending_request.is_some());
1843
1844 scheduler
1846 .on_compaction_finished(region_id, &manifest_ctx, schema_metadata_manager.clone())
1847 .await;
1848 assert_eq!(1, scheduler.region_status.len());
1849 assert_eq!(2, job_scheduler.num_jobs());
1850
1851 let status = scheduler.region_status.get(&builder.region_id()).unwrap();
1852 assert!(status.pending_request.is_none());
1853 }
1854
1855 #[tokio::test]
1856 async fn test_compaction_bypass_in_staging_mode() {
1857 let env = SchedulerEnv::new().await;
1858 let (tx, _rx) = mpsc::channel(4);
1859 let mut scheduler = env.mock_compaction_scheduler(tx);
1860
1861 let builder = VersionControlBuilder::new();
1863 let version_control = Arc::new(builder.build());
1864 let region_id = version_control.current().version.metadata.region_id;
1865
1866 let staging_manifest_ctx = {
1868 let manager = RegionManifestManager::new(
1869 version_control.current().version.metadata.clone(),
1870 0,
1871 RegionManifestOptions {
1872 manifest_dir: "".to_string(),
1873 object_store: env.access_layer.object_store().clone(),
1874 compress_type: CompressionType::Uncompressed,
1875 checkpoint_distance: 10,
1876 remove_file_options: Default::default(),
1877 manifest_cache: None,
1878 },
1879 FormatType::PrimaryKey,
1880 &Default::default(),
1881 )
1882 .await
1883 .unwrap();
1884 Arc::new(ManifestContext::new(
1885 manager,
1886 RegionRoleState::Leader(RegionLeaderState::Staging),
1887 ))
1888 };
1889
1890 let (schema_metadata_manager, _kv_backend) = mock_schema_metadata_manager();
1891
1892 let (tx, rx) = oneshot::channel();
1894 scheduler
1895 .schedule_compaction(
1896 region_id,
1897 compact_request::Options::Regular(Default::default()),
1898 &version_control,
1899 &env.access_layer,
1900 OptionOutputTx::new(Some(OutputTx::new(tx))),
1901 &staging_manifest_ctx,
1902 schema_metadata_manager,
1903 1,
1904 )
1905 .await
1906 .unwrap();
1907
1908 let result = rx.await.unwrap();
1909 assert_eq!(result.unwrap(), 0); assert_eq!(0, scheduler.region_status.len());
1911 }
1912
1913 #[tokio::test]
1914 async fn test_add_ddl_request_to_pending() {
1915 let env = SchedulerEnv::new().await;
1916 let (tx, _rx) = mpsc::channel(4);
1917 let mut scheduler = env.mock_compaction_scheduler(tx);
1918 let builder = VersionControlBuilder::new();
1919 let version_control = Arc::new(builder.build());
1920 let region_id = builder.region_id();
1921
1922 scheduler.region_status.insert(
1923 region_id,
1924 CompactionStatus::new(region_id, version_control, env.access_layer.clone()),
1925 );
1926 scheduler
1927 .region_status
1928 .get_mut(®ion_id)
1929 .unwrap()
1930 .start_local_task();
1931
1932 let (output_tx, _output_rx) = oneshot::channel();
1933 scheduler.add_ddl_request_to_pending(SenderDdlRequest {
1934 region_id,
1935 sender: OptionOutputTx::from(output_tx),
1936 request: crate::request::DdlRequest::EnterStaging(
1937 store_api::region_request::EnterStagingRequest {
1938 partition_directive:
1939 store_api::region_request::StagingPartitionDirective::RejectAllWrites,
1940 },
1941 ),
1942 });
1943
1944 assert!(scheduler.has_pending_ddls(region_id));
1945 }
1946
1947 #[tokio::test]
1948 async fn test_request_cancel_state_transitions() {
1949 let env = SchedulerEnv::new().await;
1950 let builder = VersionControlBuilder::new();
1951 let region_id = builder.region_id();
1952 let version_control = Arc::new(builder.build());
1953 let mut status =
1954 CompactionStatus::new(region_id, version_control, env.access_layer.clone());
1955 let state = status.start_local_task();
1956
1957 assert_eq!(status.request_cancel(), RequestCancelResult::CancelIssued);
1958 assert!(state.cancel_handle().is_cancelled());
1959 assert_eq!(
1960 status.request_cancel(),
1961 RequestCancelResult::AlreadyCancelling
1962 );
1963
1964 assert!(!state.mark_commit_started());
1965 assert_eq!(
1966 status.request_cancel(),
1967 RequestCancelResult::AlreadyCancelling
1968 );
1969
1970 assert!(status.clear_running_task());
1971 assert_eq!(status.request_cancel(), RequestCancelResult::NotRunning);
1972 }
1973
1974 #[tokio::test]
1975 async fn test_request_cancel_remote_compaction_is_too_late() {
1976 let env = SchedulerEnv::new().await;
1977 let builder = VersionControlBuilder::new();
1978 let region_id = builder.region_id();
1979 let version_control = Arc::new(builder.build());
1980 let mut status =
1981 CompactionStatus::new(region_id, version_control, env.access_layer.clone());
1982
1983 status.start_remote_task();
1984
1985 assert_eq!(
1986 status.request_cancel(),
1987 RequestCancelResult::TooLateToCancel
1988 );
1989 assert!(status.active_compaction.is_some());
1990 }
1991
1992 #[tokio::test]
1993 async fn test_on_compaction_cancelled_returns_pending_ddl_requests() {
1994 let job_scheduler = Arc::new(VecScheduler::default());
1995 let env = SchedulerEnv::new().await.scheduler(job_scheduler.clone());
1996 let (tx, _rx) = mpsc::channel(4);
1997 let mut scheduler = env.mock_compaction_scheduler(tx);
1998 let builder = VersionControlBuilder::new();
1999 let version_control = Arc::new(builder.build());
2000 let region_id = builder.region_id();
2001 let _manifest_ctx = env
2002 .mock_manifest_context(version_control.current().version.metadata.clone())
2003 .await;
2004 let (_schema_metadata_manager, _kv_backend) = mock_schema_metadata_manager();
2005
2006 scheduler.region_status.insert(
2007 region_id,
2008 CompactionStatus::new(region_id, version_control, env.access_layer.clone()),
2009 );
2010 scheduler
2011 .region_status
2012 .get_mut(®ion_id)
2013 .unwrap()
2014 .start_local_task();
2015
2016 let (output_tx, _output_rx) = oneshot::channel();
2017 scheduler.add_ddl_request_to_pending(SenderDdlRequest {
2018 region_id,
2019 sender: OptionOutputTx::from(output_tx),
2020 request: crate::request::DdlRequest::EnterStaging(
2021 store_api::region_request::EnterStagingRequest {
2022 partition_directive:
2023 store_api::region_request::StagingPartitionDirective::RejectAllWrites,
2024 },
2025 ),
2026 });
2027
2028 let pending_ddls = scheduler.on_compaction_cancelled(region_id).await;
2029
2030 assert_eq!(pending_ddls.len(), 1);
2031 assert!(!scheduler.has_pending_ddls(region_id));
2032 assert!(!scheduler.region_status.contains_key(®ion_id));
2033 assert_eq!(job_scheduler.num_jobs(), 0);
2034 }
2035
2036 #[tokio::test]
2037 async fn test_on_compaction_cancelled_prioritizes_pending_ddls_over_pending_compaction() {
2038 let job_scheduler = Arc::new(VecScheduler::default());
2039 let env = SchedulerEnv::new().await.scheduler(job_scheduler.clone());
2040 let (tx, _rx) = mpsc::channel(4);
2041 let mut scheduler = env.mock_compaction_scheduler(tx);
2042 let builder = VersionControlBuilder::new();
2043 let version_control = Arc::new(builder.build());
2044 let region_id = builder.region_id();
2045 let _manifest_ctx = env
2046 .mock_manifest_context(version_control.current().version.metadata.clone())
2047 .await;
2048 let (_schema_metadata_manager, _kv_backend) = mock_schema_metadata_manager();
2049
2050 scheduler.region_status.insert(
2051 region_id,
2052 CompactionStatus::new(region_id, version_control, env.access_layer.clone()),
2053 );
2054 let status = scheduler.region_status.get_mut(®ion_id).unwrap();
2055 status.start_local_task();
2056 let (manual_tx, manual_rx) = oneshot::channel();
2057 status.set_pending_request(PendingCompaction {
2058 options: compact_request::Options::StrictWindow(StrictWindow { window_seconds: 60 }),
2059 waiter: OptionOutputTx::from(manual_tx),
2060 max_parallelism: 1,
2061 });
2062
2063 let (output_tx, _output_rx) = oneshot::channel();
2064 scheduler.add_ddl_request_to_pending(SenderDdlRequest {
2065 region_id,
2066 sender: OptionOutputTx::from(output_tx),
2067 request: crate::request::DdlRequest::EnterStaging(
2068 store_api::region_request::EnterStagingRequest {
2069 partition_directive:
2070 store_api::region_request::StagingPartitionDirective::RejectAllWrites,
2071 },
2072 ),
2073 });
2074
2075 let pending_ddls = scheduler.on_compaction_cancelled(region_id).await;
2076
2077 assert_eq!(pending_ddls.len(), 1);
2078 assert!(!scheduler.region_status.contains_key(®ion_id));
2079 assert_eq!(job_scheduler.num_jobs(), 0);
2080 assert_matches!(manual_rx.await.unwrap(), Err(_));
2081 }
2082
2083 #[tokio::test]
2084 async fn test_pending_ddl_request_failed_on_compaction_failed() {
2085 let env = SchedulerEnv::new().await;
2086 let (tx, _rx) = mpsc::channel(4);
2087 let mut scheduler = env.mock_compaction_scheduler(tx);
2088 let builder = VersionControlBuilder::new();
2089 let version_control = Arc::new(builder.build());
2090 let region_id = builder.region_id();
2091
2092 scheduler.region_status.insert(
2093 region_id,
2094 CompactionStatus::new(region_id, version_control, env.access_layer.clone()),
2095 );
2096
2097 let (output_tx, output_rx) = oneshot::channel();
2098 scheduler.add_ddl_request_to_pending(SenderDdlRequest {
2099 region_id,
2100 sender: OptionOutputTx::from(output_tx),
2101 request: crate::request::DdlRequest::EnterStaging(
2102 store_api::region_request::EnterStagingRequest {
2103 partition_directive:
2104 store_api::region_request::StagingPartitionDirective::RejectAllWrites,
2105 },
2106 ),
2107 });
2108
2109 assert!(scheduler.has_pending_ddls(region_id));
2110 scheduler
2111 .on_compaction_failed(region_id, Arc::new(RegionClosedSnafu { region_id }.build()));
2112
2113 assert!(!scheduler.has_pending_ddls(region_id));
2114 let result = output_rx.await.unwrap();
2115 assert_matches!(result, Err(_));
2116 }
2117
2118 #[tokio::test]
2119 async fn test_pending_ddl_request_failed_on_region_closed() {
2120 let env = SchedulerEnv::new().await;
2121 let (tx, _rx) = mpsc::channel(4);
2122 let mut scheduler = env.mock_compaction_scheduler(tx);
2123 let builder = VersionControlBuilder::new();
2124 let version_control = Arc::new(builder.build());
2125 let region_id = builder.region_id();
2126
2127 scheduler.region_status.insert(
2128 region_id,
2129 CompactionStatus::new(region_id, version_control, env.access_layer.clone()),
2130 );
2131
2132 let (output_tx, output_rx) = oneshot::channel();
2133 scheduler.add_ddl_request_to_pending(SenderDdlRequest {
2134 region_id,
2135 sender: OptionOutputTx::from(output_tx),
2136 request: crate::request::DdlRequest::EnterStaging(
2137 store_api::region_request::EnterStagingRequest {
2138 partition_directive:
2139 store_api::region_request::StagingPartitionDirective::RejectAllWrites,
2140 },
2141 ),
2142 });
2143
2144 assert!(scheduler.has_pending_ddls(region_id));
2145 scheduler.on_region_closed(region_id);
2146
2147 assert!(!scheduler.has_pending_ddls(region_id));
2148 let result = output_rx.await.unwrap();
2149 assert_matches!(result, Err(_));
2150 }
2151
2152 #[tokio::test]
2153 async fn test_pending_ddl_request_failed_on_region_dropped() {
2154 let env = SchedulerEnv::new().await;
2155 let (tx, _rx) = mpsc::channel(4);
2156 let mut scheduler = env.mock_compaction_scheduler(tx);
2157 let builder = VersionControlBuilder::new();
2158 let version_control = Arc::new(builder.build());
2159 let region_id = builder.region_id();
2160
2161 scheduler.region_status.insert(
2162 region_id,
2163 CompactionStatus::new(region_id, version_control, env.access_layer.clone()),
2164 );
2165
2166 let (output_tx, output_rx) = oneshot::channel();
2167 scheduler.add_ddl_request_to_pending(SenderDdlRequest {
2168 region_id,
2169 sender: OptionOutputTx::from(output_tx),
2170 request: crate::request::DdlRequest::EnterStaging(
2171 store_api::region_request::EnterStagingRequest {
2172 partition_directive:
2173 store_api::region_request::StagingPartitionDirective::RejectAllWrites,
2174 },
2175 ),
2176 });
2177
2178 assert!(scheduler.has_pending_ddls(region_id));
2179 scheduler.on_region_dropped(region_id);
2180
2181 assert!(!scheduler.has_pending_ddls(region_id));
2182 let result = output_rx.await.unwrap();
2183 assert_matches!(result, Err(_));
2184 }
2185
2186 #[tokio::test]
2187 async fn test_pending_ddl_request_failed_on_region_truncated() {
2188 let env = SchedulerEnv::new().await;
2189 let (tx, _rx) = mpsc::channel(4);
2190 let mut scheduler = env.mock_compaction_scheduler(tx);
2191 let builder = VersionControlBuilder::new();
2192 let version_control = Arc::new(builder.build());
2193 let region_id = builder.region_id();
2194
2195 scheduler.region_status.insert(
2196 region_id,
2197 CompactionStatus::new(region_id, version_control, env.access_layer.clone()),
2198 );
2199
2200 let (output_tx, output_rx) = oneshot::channel();
2201 scheduler.add_ddl_request_to_pending(SenderDdlRequest {
2202 region_id,
2203 sender: OptionOutputTx::from(output_tx),
2204 request: crate::request::DdlRequest::EnterStaging(
2205 store_api::region_request::EnterStagingRequest {
2206 partition_directive:
2207 store_api::region_request::StagingPartitionDirective::RejectAllWrites,
2208 },
2209 ),
2210 });
2211
2212 assert!(scheduler.has_pending_ddls(region_id));
2213 scheduler.on_region_truncated(region_id);
2214
2215 assert!(!scheduler.has_pending_ddls(region_id));
2216 let result = output_rx.await.unwrap();
2217 assert_matches!(result, Err(_));
2218 }
2219
2220 #[tokio::test]
2221 async fn test_on_compaction_finished_returns_pending_ddl_requests() {
2222 let job_scheduler = Arc::new(VecScheduler::default());
2223 let env = SchedulerEnv::new().await.scheduler(job_scheduler.clone());
2224 let (tx, _rx) = mpsc::channel(4);
2225 let mut scheduler = env.mock_compaction_scheduler(tx);
2226 let builder = VersionControlBuilder::new();
2227 let version_control = Arc::new(builder.build());
2228 let region_id = builder.region_id();
2229 let manifest_ctx = env
2230 .mock_manifest_context(version_control.current().version.metadata.clone())
2231 .await;
2232 let (schema_metadata_manager, _kv_backend) = mock_schema_metadata_manager();
2233
2234 scheduler.region_status.insert(
2235 region_id,
2236 CompactionStatus::new(region_id, version_control, env.access_layer.clone()),
2237 );
2238 scheduler
2239 .region_status
2240 .get_mut(®ion_id)
2241 .unwrap()
2242 .start_local_task();
2243
2244 let (output_tx, _output_rx) = oneshot::channel();
2245 scheduler.add_ddl_request_to_pending(SenderDdlRequest {
2246 region_id,
2247 sender: OptionOutputTx::from(output_tx),
2248 request: crate::request::DdlRequest::EnterStaging(
2249 store_api::region_request::EnterStagingRequest {
2250 partition_directive:
2251 store_api::region_request::StagingPartitionDirective::RejectAllWrites,
2252 },
2253 ),
2254 });
2255
2256 let pending_ddls = scheduler
2257 .on_compaction_finished(region_id, &manifest_ctx, schema_metadata_manager)
2258 .await;
2259
2260 assert_eq!(pending_ddls.len(), 1);
2261 assert!(!scheduler.has_pending_ddls(region_id));
2262 assert!(!scheduler.region_status.contains_key(®ion_id));
2263 assert_eq!(job_scheduler.num_jobs(), 0);
2264 }
2265
2266 #[tokio::test]
2267 async fn test_on_compaction_finished_replays_pending_ddl_after_manual_noop() {
2268 let env = SchedulerEnv::new().await;
2269 let (tx, _rx) = mpsc::channel(4);
2270 let mut scheduler = env.mock_compaction_scheduler(tx);
2271 let builder = VersionControlBuilder::new();
2272 let version_control = Arc::new(builder.build());
2273 let region_id = builder.region_id();
2274 let manifest_ctx = env
2275 .mock_manifest_context(version_control.current().version.metadata.clone())
2276 .await;
2277 let (schema_metadata_manager, _kv_backend) = mock_schema_metadata_manager();
2278
2279 let (manual_tx, manual_rx) = oneshot::channel();
2280 let mut status =
2281 CompactionStatus::new(region_id, version_control.clone(), env.access_layer.clone());
2282 status.start_local_task();
2283 status.set_pending_request(PendingCompaction {
2284 options: compact_request::Options::Regular(Default::default()),
2285 waiter: OptionOutputTx::from(manual_tx),
2286 max_parallelism: 1,
2287 });
2288 scheduler.region_status.insert(region_id, status);
2289
2290 let (ddl_tx, _ddl_rx) = oneshot::channel();
2291 scheduler.add_ddl_request_to_pending(SenderDdlRequest {
2292 region_id,
2293 sender: OptionOutputTx::from(ddl_tx),
2294 request: crate::request::DdlRequest::EnterStaging(
2295 store_api::region_request::EnterStagingRequest {
2296 partition_directive:
2297 store_api::region_request::StagingPartitionDirective::RejectAllWrites,
2298 },
2299 ),
2300 });
2301
2302 let pending_ddls = scheduler
2303 .on_compaction_finished(region_id, &manifest_ctx, schema_metadata_manager)
2304 .await;
2305
2306 assert_eq!(pending_ddls.len(), 1);
2307 assert!(!scheduler.region_status.contains_key(®ion_id));
2308 assert_eq!(manual_rx.await.unwrap().unwrap(), 0);
2309 }
2310
2311 #[tokio::test]
2312 async fn test_on_compaction_finished_returns_empty_when_region_absent() {
2313 let env = SchedulerEnv::new().await;
2314 let (tx, _rx) = mpsc::channel(4);
2315 let mut scheduler = env.mock_compaction_scheduler(tx);
2316 let builder = VersionControlBuilder::new();
2317 let region_id = builder.region_id();
2318 let version_control = Arc::new(builder.build());
2319 let manifest_ctx = env
2320 .mock_manifest_context(version_control.current().version.metadata.clone())
2321 .await;
2322 let (schema_metadata_manager, _kv_backend) = mock_schema_metadata_manager();
2323
2324 let pending_ddls = scheduler
2325 .on_compaction_finished(region_id, &manifest_ctx, schema_metadata_manager)
2326 .await;
2327
2328 assert!(pending_ddls.is_empty());
2329 }
2330
2331 #[tokio::test]
2332 async fn test_on_compaction_finished_manual_schedule_error_cleans_status() {
2333 let env = SchedulerEnv::new()
2334 .await
2335 .scheduler(Arc::new(FailingScheduler));
2336 let (tx, _rx) = mpsc::channel(4);
2337 let mut scheduler = env.mock_compaction_scheduler(tx);
2338 let mut builder = VersionControlBuilder::new();
2339 let end = 1000 * 1000;
2340 let version_control = Arc::new(
2341 builder
2342 .push_l0_file(0, end)
2343 .push_l0_file(10, end)
2344 .push_l0_file(50, end)
2345 .push_l0_file(80, end)
2346 .push_l0_file(90, end)
2347 .build(),
2348 );
2349 let region_id = builder.region_id();
2350 let manifest_ctx = env
2351 .mock_manifest_context(version_control.current().version.metadata.clone())
2352 .await;
2353 let (schema_metadata_manager, _kv_backend) = mock_schema_metadata_manager();
2354
2355 let (manual_tx, manual_rx) = oneshot::channel();
2356 let mut status =
2357 CompactionStatus::new(region_id, version_control.clone(), env.access_layer.clone());
2358 status.start_local_task();
2359 status.set_pending_request(PendingCompaction {
2360 options: compact_request::Options::Regular(Default::default()),
2361 waiter: OptionOutputTx::from(manual_tx),
2362 max_parallelism: 1,
2363 });
2364 scheduler.region_status.insert(region_id, status);
2365
2366 let (ddl_tx, ddl_rx) = oneshot::channel();
2367 scheduler.add_ddl_request_to_pending(SenderDdlRequest {
2368 region_id,
2369 sender: OptionOutputTx::from(ddl_tx),
2370 request: crate::request::DdlRequest::EnterStaging(
2371 store_api::region_request::EnterStagingRequest {
2372 partition_directive:
2373 store_api::region_request::StagingPartitionDirective::RejectAllWrites,
2374 },
2375 ),
2376 });
2377
2378 let pending_ddls = scheduler
2379 .on_compaction_finished(region_id, &manifest_ctx, schema_metadata_manager)
2380 .await;
2381
2382 assert!(pending_ddls.is_empty());
2383 assert!(!scheduler.region_status.contains_key(®ion_id));
2384 assert!(manual_rx.await.is_err());
2385 assert_matches!(ddl_rx.await.unwrap(), Err(_));
2386 }
2387
2388 #[tokio::test]
2389 async fn test_on_compaction_finished_next_schedule_noop_removes_status() {
2390 let env = SchedulerEnv::new().await;
2391 let (tx, _rx) = mpsc::channel(4);
2392 let mut scheduler = env.mock_compaction_scheduler(tx);
2393 let builder = VersionControlBuilder::new();
2394 let version_control = Arc::new(builder.build());
2395 let region_id = builder.region_id();
2396 let manifest_ctx = env
2397 .mock_manifest_context(version_control.current().version.metadata.clone())
2398 .await;
2399 let (schema_metadata_manager, _kv_backend) = mock_schema_metadata_manager();
2400
2401 scheduler.region_status.insert(
2402 region_id,
2403 CompactionStatus::new(region_id, version_control, env.access_layer.clone()),
2404 );
2405 scheduler
2406 .region_status
2407 .get_mut(®ion_id)
2408 .unwrap()
2409 .start_local_task();
2410
2411 let pending_ddls = scheduler
2412 .on_compaction_finished(region_id, &manifest_ctx, schema_metadata_manager)
2413 .await;
2414
2415 assert!(pending_ddls.is_empty());
2416 assert!(scheduler.region_status.contains_key(®ion_id));
2417
2418 let (schema_metadata_manager, _kv_backend) = mock_schema_metadata_manager();
2419 let scheduled = scheduler
2422 .schedule_next_compaction(region_id, &manifest_ctx, schema_metadata_manager)
2423 .await;
2424 assert!(!scheduled);
2425 assert!(!scheduler.region_status.contains_key(®ion_id));
2426 }
2427
2428 #[tokio::test]
2429 async fn test_on_compaction_finished_next_schedule_error_cleans_status() {
2430 let env = SchedulerEnv::new()
2431 .await
2432 .scheduler(Arc::new(FailingScheduler));
2433 let (tx, _rx) = mpsc::channel(4);
2434 let mut scheduler = env.mock_compaction_scheduler(tx);
2435 let mut builder = VersionControlBuilder::new();
2436 let end = 1000 * 1000;
2437 let version_control = Arc::new(
2438 builder
2439 .push_l0_file(0, end)
2440 .push_l0_file(10, end)
2441 .push_l0_file(50, end)
2442 .push_l0_file(80, end)
2443 .push_l0_file(90, end)
2444 .build(),
2445 );
2446 let region_id = builder.region_id();
2447 let manifest_ctx = env
2448 .mock_manifest_context(version_control.current().version.metadata.clone())
2449 .await;
2450 let (schema_metadata_manager, _kv_backend) = mock_schema_metadata_manager();
2451
2452 scheduler.region_status.insert(
2453 region_id,
2454 CompactionStatus::new(region_id, version_control, env.access_layer.clone()),
2455 );
2456 scheduler
2457 .region_status
2458 .get_mut(®ion_id)
2459 .unwrap()
2460 .start_local_task();
2461
2462 let pending_ddls = scheduler
2463 .on_compaction_finished(region_id, &manifest_ctx, schema_metadata_manager)
2464 .await;
2465
2466 assert!(pending_ddls.is_empty());
2467 assert!(scheduler.region_status.contains_key(®ion_id));
2468
2469 let (schema_metadata_manager, _kv_backend) = mock_schema_metadata_manager();
2470 let scheduled = scheduler
2472 .schedule_next_compaction(region_id, &manifest_ctx, schema_metadata_manager)
2473 .await;
2474 assert!(!scheduled);
2475 assert!(!scheduler.region_status.contains_key(®ion_id));
2476 }
2477
2478 #[tokio::test]
2479 async fn test_concurrent_memory_competition() {
2480 let manager = Arc::new(new_compaction_memory_manager(3 * 1024 * 1024)); let barrier = Arc::new(Barrier::new(3));
2482 let mut handles = vec![];
2483
2484 for _i in 0..3 {
2486 let mgr = manager.clone();
2487 let bar = barrier.clone();
2488 let handle = tokio::spawn(async move {
2489 bar.wait().await; mgr.try_acquire(2 * 1024 * 1024)
2491 });
2492 handles.push(handle);
2493 }
2494
2495 let results: Vec<Option<CompactionMemoryGuard>> = futures::future::join_all(handles)
2496 .await
2497 .into_iter()
2498 .map(|r| r.unwrap())
2499 .collect();
2500
2501 let succeeded = results.iter().filter(|r| r.is_some()).count();
2503 let failed = results.iter().filter(|r| r.is_none()).count();
2504
2505 assert_eq!(succeeded, 1, "Expected exactly 1 task to acquire memory");
2506 assert_eq!(failed, 2, "Expected 2 tasks to fail");
2507
2508 drop(results);
2510 assert_eq!(manager.used_bytes(), 0);
2511 }
2512}