From 55ae5e5b66f916e5227e6ff7de3b2b929adce169 Mon Sep 17 00:00:00 2001 From: Yingwen Date: Sun, 17 Sep 2023 17:15:11 +0800 Subject: [PATCH] feat(mito): Implements compaction scheduler (#2413) * feat: allow multiple waiters in compaction request * feat: compaction status wip * feat: track region status in compaction scheduler * feat: impl compaction scheduler * feat: call compaction scheduler * feat: remove status if nothing to compact * feat: schedule compaction after flush * feat: set compacting to false after compaction finished * refactor: flush status only needs region id and version control * refactor: schedule_compaction don't need region as argument * test: test flush/scheduler for empty requests * test: trigger compaction in test * feat: notify scheduler on truncated * chore: Apply suggestions from code review Co-authored-by: JeremyHi --------- Co-authored-by: JeremyHi --- src/mito2/src/compaction.rs | 339 ++++++++++++++++++++-- src/mito2/src/compaction/twcs.rs | 23 +- src/mito2/src/engine/compaction_test.rs | 16 +- src/mito2/src/error.rs | 6 +- src/mito2/src/flush.rs | 99 +++++-- src/mito2/src/memtable.rs | 56 +--- src/mito2/src/read/scan_region.rs | 7 + src/mito2/src/read/seq_scan.rs | 5 + src/mito2/src/request.rs | 18 +- src/mito2/src/test_util.rs | 4 + src/mito2/src/test_util/memtable_util.rs | 81 ++++++ src/mito2/src/test_util/scheduler_util.rs | 77 +++++ src/mito2/src/test_util/version_util.rs | 115 ++++++++ src/mito2/src/worker.rs | 2 +- src/mito2/src/worker/handle_alter.rs | 5 +- src/mito2/src/worker/handle_close.rs | 2 + src/mito2/src/worker/handle_compaction.rs | 36 +-- src/mito2/src/worker/handle_drop.rs | 2 + src/mito2/src/worker/handle_flush.rs | 37 ++- src/mito2/src/worker/handle_truncate.rs | 5 +- 20 files changed, 768 insertions(+), 167 deletions(-) create mode 100644 src/mito2/src/test_util/memtable_util.rs create mode 100644 src/mito2/src/test_util/scheduler_util.rs create mode 100644 src/mito2/src/test_util/version_util.rs diff --git a/src/mito2/src/compaction.rs b/src/mito2/src/compaction.rs index 08b64432a7..8aff441487 100644 --- a/src/mito2/src/compaction.rs +++ b/src/mito2/src/compaction.rs @@ -18,19 +18,23 @@ mod picker; mod test_util; mod twcs; +use std::collections::HashMap; use std::sync::Arc; use std::time::Duration; -use common_telemetry::debug; +use common_telemetry::{debug, error}; pub use picker::CompactionPickerRef; +use snafu::ResultExt; use store_api::storage::{CompactionStrategy, RegionId, TwcsOptions}; -use tokio::sync::mpsc; +use tokio::sync::mpsc::{self, Sender}; use crate::access_layer::AccessLayerRef; use crate::compaction::twcs::TwcsPicker; -use crate::error::Result; -use crate::region::version::VersionRef; -use crate::request::{OptionOutputTx, WorkerRequest}; +use crate::error::{ + CompactRegionSnafu, Error, RegionClosedSnafu, RegionDroppedSnafu, RegionTruncatedSnafu, Result, +}; +use crate::region::version::{VersionControlRef, VersionRef}; +use crate::request::{OptionOutputTx, OutputTx, WorkerRequest}; use crate::schedule::scheduler::SchedulerRef; use crate::sst::file_purger::FilePurgerRef; @@ -40,8 +44,10 @@ pub struct CompactionRequest { pub(crate) access_layer: AccessLayerRef, pub(crate) ttl: Option, pub(crate) compaction_time_window: Option, + /// Sender to send notification to the region worker. pub(crate) request_sender: mpsc::Sender, - pub(crate) waiter: OptionOutputTx, + /// Waiters of the compaction request. + pub(crate) waiters: Vec, pub(crate) file_purger: FilePurgerRef, } @@ -49,6 +55,13 @@ impl CompactionRequest { pub(crate) fn region_id(&self) -> RegionId { self.current_version.metadata.region_id } + + /// Push waiter to the request. + pub(crate) fn push_waiter(&mut self, mut waiter: OptionOutputTx) { + if let Some(waiter) = waiter.take_inner() { + self.waiters.push(waiter); + } + } } /// Builds compaction picker according to [CompactionStrategy]. @@ -62,31 +75,307 @@ pub fn compaction_strategy_to_picker(strategy: &CompactionStrategy) -> Compactio } } +/// Compaction scheduler tracks and manages compaction tasks. pub(crate) struct CompactionScheduler { scheduler: SchedulerRef, - // TODO(hl): maybe tracks region compaction status in CompactionScheduler + /// Compacting regions. + region_status: HashMap, + /// Request sender of the worker that this scheduler belongs to. + request_sender: Sender, } impl CompactionScheduler { - pub(crate) fn new(scheduler: SchedulerRef) -> Self { - Self { scheduler } + pub(crate) fn new(scheduler: SchedulerRef, request_sender: Sender) -> Self { + Self { + scheduler, + region_status: HashMap::new(), + request_sender, + } } - /// Schedules a region compaction task. - pub(crate) fn schedule_compaction(&self, req: CompactionRequest) -> Result<()> { - self.scheduler.schedule(Box::pin(async { - // TODO(hl): build picker according to region options. - let picker = - compaction_strategy_to_picker(&CompactionStrategy::Twcs(TwcsOptions::default())); - debug!( - "Pick compaction strategy {:?} for region: {}", - picker, - req.region_id() - ); - let Some(mut task) = picker.pick(req) else { - return; - }; - task.run().await; - })) + /// Schedules a compaction for the region. + pub(crate) fn schedule_compaction( + &mut self, + region_id: RegionId, + version_control: &VersionControlRef, + access_layer: &AccessLayerRef, + file_purger: &FilePurgerRef, + waiter: OptionOutputTx, + ) -> Result<()> { + let status = self.region_status.entry(region_id).or_insert_with(|| { + CompactionStatus::new( + region_id, + version_control.clone(), + access_layer.clone(), + file_purger.clone(), + ) + }); + if status.compacting { + // Region is compacting. Add the waiter to pending list. + status.merge_waiter(waiter); + return Ok(()); + } + + // The region can compact directly. + let request = status.new_compaction_request(self.request_sender.clone(), waiter); + // Mark the region as compacting. + status.compacting = true; + self.schedule_compaction_request(request) + } + + /// Notifies the scheduler that the compaction job is finished successfully. + pub(crate) fn on_compaction_finished(&mut self, region_id: RegionId) { + let Some(status) = self.region_status.get_mut(®ion_id) else { + return; + }; + status.compacting = false; + // We should always try to compact the region until picker returns None. + let request = + status.new_compaction_request(self.request_sender.clone(), OptionOutputTx::none()); + // Try to schedule next compaction task for this region. + if let Err(e) = self.schedule_compaction_request(request) { + error!(e; "Failed to schedule next compaction for region {}", region_id); + } + } + + /// Notifies the scheduler that the compaction job is failed. + pub(crate) fn on_compaction_failed(&mut self, region_id: RegionId, err: Arc) { + error!(err; "Region {} failed to flush, cancel all pending tasks", region_id); + // Remove this region. + let Some(status) = self.region_status.remove(®ion_id) else { + return; + }; + + // Fast fail: cancels all pending tasks and sends error to their waiters. + status.on_failure(err); + } + + /// Notifies the scheduler that the region is dropped. + pub(crate) fn on_region_dropped(&mut self, region_id: RegionId) { + self.remove_region_on_failure( + region_id, + Arc::new(RegionDroppedSnafu { region_id }.build()), + ); + } + + /// Notifies the scheduler that the region is closed. + pub(crate) fn on_region_closed(&mut self, region_id: RegionId) { + self.remove_region_on_failure(region_id, Arc::new(RegionClosedSnafu { region_id }.build())); + } + + /// Notifies the scheduler that the region is truncated. + pub(crate) fn on_region_truncated(&mut self, region_id: RegionId) { + self.remove_region_on_failure( + region_id, + Arc::new(RegionTruncatedSnafu { region_id }.build()), + ); + } + + /// Schedules a compaction request. + /// + /// If the region has nothing to compact, it removes the region from the status map. + fn schedule_compaction_request(&mut self, request: CompactionRequest) -> Result<()> { + // TODO(hl): build picker according to region options. + let picker = + compaction_strategy_to_picker(&CompactionStrategy::Twcs(TwcsOptions::default())); + let region_id = request.region_id(); + debug!( + "Pick compaction strategy {:?} for region: {}", + picker, region_id + ); + let Some(mut task) = picker.pick(request) else { + // Nothing to compact, remove it from the region status map. + self.region_status.remove(®ion_id); + return Ok(()); + }; + + // Submit the compaction task. + self.scheduler + .schedule(Box::pin(async move { + task.run().await; + })) + .map_err(|e| { + error!(e; "Failed to submit compaction request for region {}", region_id); + + // If failed to submit the job, we need to remove the region from the scheduler. + self.region_status.remove(®ion_id); + + e + }) + } + + fn remove_region_on_failure(&mut self, region_id: RegionId, err: Arc) { + // Remove this region. + let Some(status) = self.region_status.remove(®ion_id) else { + return; + }; + + // Notifies all pending tasks. + status.on_failure(err); + } +} + +impl Drop for CompactionScheduler { + fn drop(&mut self) { + for (region_id, status) in self.region_status.drain() { + // We are shutting down so notify all pending tasks. + status.on_failure(Arc::new(RegionClosedSnafu { region_id }.build())); + } + } +} + +/// Pending compaction tasks. +struct PendingCompaction { + waiters: Vec, +} + +impl PendingCompaction { + /// Push waiter to the request. + fn push_waiter(&mut self, mut waiter: OptionOutputTx) { + if let Some(waiter) = waiter.take_inner() { + self.waiters.push(waiter); + } + } + + /// Send flush error to waiter. + fn on_failure(&mut self, region_id: RegionId, err: Arc) { + for waiter in self.waiters.drain(..) { + waiter.send(Err(err.clone()).context(CompactRegionSnafu { region_id })); + } + } +} + +/// Status of running and pending region compaction tasks. +struct CompactionStatus { + /// Id of the region. + region_id: RegionId, + /// Version control of the region. + version_control: VersionControlRef, + /// Access layer of the region. + access_layer: AccessLayerRef, + /// File purger of the region. + file_purger: FilePurgerRef, + /// Whether a compaction task is running. + compacting: bool, + /// Compaction pending to schedule. + /// + /// For simplicity, we merge all pending compaction requests into one. + pending_compaction: Option, +} + +impl CompactionStatus { + /// Creates a new [CompactionStatus] + fn new( + region_id: RegionId, + version_control: VersionControlRef, + access_layer: AccessLayerRef, + file_purger: FilePurgerRef, + ) -> CompactionStatus { + CompactionStatus { + region_id, + version_control, + access_layer, + file_purger, + compacting: false, + pending_compaction: None, + } + } + + /// Merge the watier to the pending compaction. + fn merge_waiter(&mut self, waiter: OptionOutputTx) { + let pending = self + .pending_compaction + .get_or_insert_with(|| PendingCompaction { + waiters: Vec::new(), + }); + pending.push_waiter(waiter); + } + + fn on_failure(self, err: Arc) { + if let Some(mut pending) = self.pending_compaction { + pending.on_failure(self.region_id, err.clone()); + } + } + + /// Creates a new compaction request for compaction picker. + /// + /// It consumes all pending compaction waiters. + fn new_compaction_request( + &mut self, + request_sender: Sender, + waiter: OptionOutputTx, + ) -> CompactionRequest { + let current_version = self.version_control.current().version; + let mut req = CompactionRequest { + current_version, + access_layer: self.access_layer.clone(), + // TODO(hl): get TTL info from region metadata + ttl: None, + // TODO(hl): get persisted region compaction time window + compaction_time_window: None, + request_sender: request_sender.clone(), + waiters: Vec::new(), + file_purger: self.file_purger.clone(), + }; + + if let Some(pending) = self.pending_compaction.take() { + req.waiters = pending.waiters; + } + req.push_waiter(waiter); + + req + } +} + +#[cfg(test)] +mod tests { + use common_query::Output; + use tokio::sync::oneshot; + + use super::*; + use crate::test_util::scheduler_util::SchedulerEnv; + use crate::test_util::version_util::VersionControlBuilder; + + #[tokio::test] + async fn test_schedule_empty() { + let env = SchedulerEnv::new(); + let (tx, _rx) = mpsc::channel(4); + let mut scheduler = env.mock_compaction_scheduler(tx); + let mut builder = VersionControlBuilder::new(); + let purger = builder.file_purger(); + + // Nothing to compact. + let version_control = Arc::new(builder.build()); + let (output_tx, output_rx) = oneshot::channel(); + let waiter = OptionOutputTx::from(output_tx); + scheduler + .schedule_compaction( + builder.region_id(), + &version_control, + &env.access_layer, + &purger, + waiter, + ) + .unwrap(); + let output = output_rx.await.unwrap().unwrap(); + assert!(matches!(output, Output::AffectedRows(0))); + assert!(scheduler.region_status.is_empty()); + + // Only one file, picker won't compact it. + let version_control = Arc::new(builder.push_l0_file(0, 1000).build()); + let (output_tx, output_rx) = oneshot::channel(); + let waiter = OptionOutputTx::from(output_tx); + scheduler + .schedule_compaction( + builder.region_id(), + &version_control, + &env.access_layer, + &purger, + waiter, + ) + .unwrap(); + let output = output_rx.await.unwrap().unwrap(); + assert!(matches!(output, Output::AffectedRows(0))); + assert!(scheduler.region_status.is_empty()); } } diff --git a/src/mito2/src/compaction/twcs.rs b/src/mito2/src/compaction/twcs.rs index 1b6293b9c8..9812e2c00f 100644 --- a/src/mito2/src/compaction/twcs.rs +++ b/src/mito2/src/compaction/twcs.rs @@ -35,7 +35,7 @@ use crate::compaction::CompactionRequest; use crate::error; use crate::error::CompactRegionSnafu; use crate::request::{ - BackgroundNotify, CompactionFailed, CompactionFinished, OptionOutputTx, WorkerRequest, + BackgroundNotify, CompactionFailed, CompactionFinished, OutputTx, WorkerRequest, }; use crate::sst::file::{FileHandle, FileId, FileMeta}; use crate::sst::file_purger::FilePurgerRef; @@ -123,7 +123,7 @@ impl Picker for TwcsPicker { ttl, compaction_time_window, request_sender, - waiter, + waiters, file_purger, } = req; @@ -156,8 +156,10 @@ impl Picker for TwcsPicker { let outputs = self.build_output(&windows, active_window, time_window_size); if outputs.is_empty() && expired_ssts.is_empty() { - // Nothing to compact. - waiter.send(Ok(Output::AffectedRows(0))); + // Nothing to compact, we are done. Notifies all waiters as we consume the compaction request. + for waiter in waiters { + waiter.send(Ok(Output::AffectedRows(0))); + } return None; } let task = TwcsCompactionTask { @@ -169,7 +171,7 @@ impl Picker for TwcsPicker { sst_write_buffer_size: ReadableSize::mb(4), compaction_time_window: None, request_sender, - sender: waiter, + waiters, file_purger, }; Some(Box::new(task)) @@ -227,8 +229,8 @@ pub(crate) struct TwcsCompactionTask { pub file_purger: FilePurgerRef, /// Request sender to notify the worker. pub(crate) request_sender: mpsc::Sender, - /// Sender that are used to notify waiters waiting for pending compaction tasks. - pub sender: OptionOutputTx, + /// Senders that are used to notify waiters waiting for pending compaction tasks. + pub waiters: Vec, } impl Debug for TwcsCompactionTask { @@ -321,10 +323,11 @@ impl TwcsCompactionTask { /// Handles compaction failure, notifies all waiters. fn on_failure(&mut self, err: Arc) { - self.sender - .send_mut(Err(err.clone()).context(CompactRegionSnafu { + for waiter in self.waiters.drain(..) { + waiter.send(Err(err.clone()).context(CompactRegionSnafu { region_id: self.region_id, })); + } } /// Notifies region worker to handle post-compaction tasks. @@ -352,7 +355,7 @@ impl CompactionTask for TwcsCompactionTask { region_id: self.region_id, compaction_outputs: added, compacted_files: deleted, - sender: self.sender.take(), + senders: std::mem::take(&mut self.waiters), file_purger: self.file_purger.clone(), }) } diff --git a/src/mito2/src/engine/compaction_test.rs b/src/mito2/src/engine/compaction_test.rs index 9c196d78b9..9a88ecbc49 100644 --- a/src/mito2/src/engine/compaction_test.rs +++ b/src/mito2/src/engine/compaction_test.rs @@ -121,10 +121,12 @@ async fn test_compaction_region() { .handle_request(region_id, RegionRequest::Create(request)) .await .unwrap(); + // Flush 5 SSTs for compaction. put_and_flush(&engine, region_id, &column_schemas, 0..10).await; put_and_flush(&engine, region_id, &column_schemas, 10..20).await; put_and_flush(&engine, region_id, &column_schemas, 20..30).await; - delete_and_flush(&engine, region_id, &column_schemas, 25..30).await; + delete_and_flush(&engine, region_id, &column_schemas, 15..30).await; + put_and_flush(&engine, region_id, &column_schemas, 15..25).await; let output = engine .handle_request(region_id, RegionRequest::Compact(RegionCompactRequest {})) @@ -132,10 +134,14 @@ async fn test_compaction_region() { .unwrap(); assert!(matches!(output, Output::AffectedRows(0))); - let stream = engine - .handle_query(region_id, ScanRequest::default()) - .await - .unwrap(); + let scanner = engine.scanner(region_id, ScanRequest::default()).unwrap(); + assert_eq!( + 1, + scanner.num_files(), + "unexpected files: {:?}", + scanner.file_ids() + ); + let stream = scanner.scan().await.unwrap(); let vec = collect_stream_ts(stream).await; assert_eq!((0..25).map(|v| v * 1000).collect::>(), vec); diff --git a/src/mito2/src/error.rs b/src/mito2/src/error.rs index 4925e76b60..7986db4c8a 100644 --- a/src/mito2/src/error.rs +++ b/src/mito2/src/error.rs @@ -402,8 +402,8 @@ pub enum Error { location: Location, }, - #[snafu(display("Region {} is truncating, location: {}", region_id, location))] - RegionTruncating { + #[snafu(display("Region {} is truncated, location: {}", region_id, location))] + RegionTruncated { region_id: RegionId, location: Location, }, @@ -516,7 +516,7 @@ impl ErrorExt for Error { FlushRegion { source, .. } => source.status_code(), RegionDropped { .. } => StatusCode::Cancelled, RegionClosed { .. } => StatusCode::Cancelled, - RegionTruncating { .. } => StatusCode::Cancelled, + RegionTruncated { .. } => StatusCode::Cancelled, RejectWrite { .. } => StatusCode::StorageUnavailable, CompactRegion { source, .. } => source.status_code(), CompatReader { .. } => StatusCode::Unexpected, diff --git a/src/mito2/src/flush.rs b/src/mito2/src/flush.rs index 1ae246ed20..73c7d54b9a 100644 --- a/src/mito2/src/flush.rs +++ b/src/mito2/src/flush.rs @@ -26,12 +26,11 @@ use tokio::sync::mpsc; use crate::access_layer::AccessLayerRef; use crate::error::{ - Error, FlushRegionSnafu, RegionClosedSnafu, RegionDroppedSnafu, RegionTruncatingSnafu, Result, + Error, FlushRegionSnafu, RegionClosedSnafu, RegionDroppedSnafu, RegionTruncatedSnafu, Result, }; use crate::memtable::MemtableBuilderRef; use crate::read::Source; -use crate::region::version::{VersionControlData, VersionRef}; -use crate::region::MitoRegionRef; +use crate::region::version::{VersionControlData, VersionControlRef, VersionRef}; use crate::request::{ BackgroundNotify, FlushFailed, FlushFinished, OptionOutputTx, OutputTx, SenderDdlRequest, SenderWriteRequest, WorkerRequest, @@ -218,10 +217,10 @@ impl RegionFlushTask { /// Converts the flush task into a background job. /// /// We must call this in the region worker. - fn into_flush_job(mut self, region: &MitoRegionRef) -> Job { + fn into_flush_job(mut self, version_control: &VersionControlRef) -> Job { // Get a version of this region before creating a job to get current // wal entry id, sequence and immutable memtables. - let version_data = region.version_control.current(); + let version_data = version_control.current(); Box::pin(async move { self.do_flush(version_data).await; @@ -353,14 +352,15 @@ impl FlushScheduler { /// Schedules a flush `task` for specific `region`. pub(crate) fn schedule_flush( &mut self, - region: &MitoRegionRef, + region_id: RegionId, + version_control: &VersionControlRef, task: RegionFlushTask, ) -> Result<()> { - debug_assert_eq!(region.region_id, task.region_id); + debug_assert_eq!(region_id, task.region_id); - let version = region.version_control.current().version; + let version = version_control.current().version; if version.memtables.mutable.is_empty() && version.memtables.immutables().is_empty() { - debug_assert!(!self.region_status.contains_key(®ion.region_id)); + debug_assert!(!self.region_status.contains_key(®ion_id)); // The region has nothing to flush. task.on_success(); return Ok(()); @@ -369,8 +369,8 @@ impl FlushScheduler { // Add this region to status map. let flush_status = self .region_status - .entry(region.region_id) - .or_insert_with(|| FlushStatus::new(region.clone())); + .entry(region_id) + .or_insert_with(|| FlushStatus::new(region_id, version_control.clone())); // Checks whether we can flush the region now. if flush_status.flushing { // There is already a flush job running. @@ -378,6 +378,7 @@ impl FlushScheduler { return Ok(()); } + // TODO(yingwen): We can merge with pending and execute directly. // If there are pending tasks, then we should push it to pending list. if flush_status.pending_task.is_some() { flush_status.merge_task(task); @@ -385,18 +386,16 @@ impl FlushScheduler { } // Now we can flush the region directly. - region - .version_control - .freeze_mutable(&task.memtable_builder); + version_control.freeze_mutable(&task.memtable_builder); // Submit a flush job. - let job = task.into_flush_job(region); + let job = task.into_flush_job(version_control); if let Err(e) = self.scheduler.schedule(job) { // If scheduler returns error, senders in the job will be dropped and waiters // can get recv errors. - error!(e; "Failed to schedule flush job for region {}", region.region_id); + error!(e; "Failed to schedule flush job for region {}", region_id); // Remove from region status if we can't submit the task. - self.region_status.remove(®ion.region_id); + self.region_status.remove(®ion_id); return Err(e); } flush_status.flushing = true; @@ -435,7 +434,7 @@ impl FlushScheduler { pending_requests } - /// Notifies the scheduler that the flush job is finished. + /// Notifies the scheduler that the flush job is failed. pub(crate) fn on_flush_failed(&mut self, region_id: RegionId, err: Arc) { error!(err; "Region {} failed to flush, cancel all pending tasks", region_id); @@ -466,15 +465,15 @@ impl FlushScheduler { self.remove_region_on_failure(region_id, Arc::new(RegionClosedSnafu { region_id }.build())); } - /// Notifies the scheduler that the region is truncating. - pub(crate) fn on_region_truncating(&mut self, region_id: RegionId) { + /// Notifies the scheduler that the region is truncated. + pub(crate) fn on_region_truncated(&mut self, region_id: RegionId) { self.remove_region_on_failure( region_id, - Arc::new(RegionTruncatingSnafu { region_id }.build()), + Arc::new(RegionTruncatedSnafu { region_id }.build()), ); } - pub(crate) fn remove_region_on_failure(&mut self, region_id: RegionId, err: Arc) { + fn remove_region_on_failure(&mut self, region_id: RegionId, err: Arc) { // Remove this region. let Some(flush_status) = self.region_status.remove(®ion_id) else { return; @@ -518,7 +517,7 @@ impl FlushScheduler { debug_assert!(self .region_status .values() - .all(|status| !status.flushing && status.pending_task.is_some())); + .all(|status| status.flushing || status.pending_task.is_some())); // Get the first region from status map. let Some(flush_status) = self @@ -530,9 +529,10 @@ impl FlushScheduler { }; debug_assert!(!flush_status.flushing); let task = flush_status.pending_task.take().unwrap(); - let region = flush_status.region.clone(); + let region_id = flush_status.region_id; + let version_control = flush_status.version_control.clone(); - self.schedule_flush(®ion, task) + self.schedule_flush(region_id, &version_control, task) } } @@ -550,8 +550,13 @@ impl Drop for FlushScheduler { /// Tracks running and pending flush tasks and all pending requests of a region. struct FlushStatus { /// Current region. - region: MitoRegionRef, + region_id: RegionId, + /// Version control of the region. + version_control: VersionControlRef, /// There is a flush task running. + /// + /// It is possible that a region is not flushing but has pending task if the scheduler + /// doesn't schedules this region. flushing: bool, /// Task waiting for next flush. pending_task: Option, @@ -562,9 +567,10 @@ struct FlushStatus { } impl FlushStatus { - fn new(region: MitoRegionRef) -> FlushStatus { + fn new(region_id: RegionId, version_control: VersionControlRef) -> FlushStatus { FlushStatus { - region, + region_id, + version_control, flushing: false, pending_task: None, pending_ddls: Vec::new(), @@ -587,14 +593,14 @@ impl FlushStatus { } for ddl in self.pending_ddls { ddl.sender.send(Err(err.clone()).context(FlushRegionSnafu { - region_id: self.region.region_id, + region_id: self.region_id, })); } for write_req in self.pending_writes { write_req .sender .send(Err(err.clone()).context(FlushRegionSnafu { - region_id: self.region.region_id, + region_id: self.region_id, })); } } @@ -602,7 +608,11 @@ impl FlushStatus { #[cfg(test)] mod tests { + use tokio::sync::oneshot; + use super::*; + use crate::test_util::scheduler_util::SchedulerEnv; + use crate::test_util::version_util::VersionControlBuilder; #[test] fn test_get_mutable_limit() { @@ -656,4 +666,33 @@ mod tests { manager.reserve_mem(100); assert!(manager.should_flush_engine()); } + + #[tokio::test] + async fn test_schedule_empty() { + let env = SchedulerEnv::new(); + let (tx, _rx) = mpsc::channel(4); + let mut scheduler = env.mock_flush_scheduler(); + let builder = VersionControlBuilder::new(); + + let version_control = Arc::new(builder.build()); + let (output_tx, output_rx) = oneshot::channel(); + let mut task = RegionFlushTask { + region_id: builder.region_id(), + reason: FlushReason::Others, + senders: Vec::new(), + request_sender: tx, + access_layer: env.access_layer.clone(), + memtable_builder: builder.memtable_builder(), + file_purger: builder.file_purger(), + listener: WorkerListener::default(), + }; + task.push_sender(OptionOutputTx::from(output_tx)); + scheduler + .schedule_flush(builder.region_id(), &version_control, task) + .unwrap(); + assert!(scheduler.region_status.is_empty()); + let output = output_rx.await.unwrap().unwrap(); + assert!(matches!(output, Output::AffectedRows(0))); + assert!(scheduler.region_status.is_empty()); + } } diff --git a/src/mito2/src/memtable.rs b/src/mito2/src/memtable.rs index 82675e9ed3..d9c2f2e40b 100644 --- a/src/mito2/src/memtable.rs +++ b/src/mito2/src/memtable.rs @@ -20,7 +20,7 @@ pub mod key_values; pub(crate) mod version; use std::fmt; -use std::sync::atomic::{AtomicBool, AtomicU32, AtomicUsize, Ordering}; +use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; use std::sync::Arc; use common_query::logical_plan::Expr; @@ -89,45 +89,6 @@ pub trait MemtableBuilder: Send + Sync + fmt::Debug { pub type MemtableBuilderRef = Arc; -// TODO(yingwen): Remove it once we port the memtable. -/// Empty memtable for test. -#[derive(Debug, Default)] -pub(crate) struct EmptyMemtable { - /// Id of this memtable. - id: MemtableId, -} - -impl EmptyMemtable { - /// Returns a new memtable with specific `id`. - pub(crate) fn new(id: MemtableId) -> EmptyMemtable { - EmptyMemtable { id } - } -} - -impl Memtable for EmptyMemtable { - fn id(&self) -> MemtableId { - self.id - } - - fn write(&self, _kvs: &KeyValues) -> Result<()> { - Ok(()) - } - - fn iter(&self, _projection: Option<&[ColumnId]>, _filters: &[Expr]) -> BoxedBatchIterator { - Box::new(std::iter::empty()) - } - - fn is_empty(&self) -> bool { - true - } - - fn mark_immutable(&self) {} - - fn stats(&self) -> MemtableStats { - MemtableStats::default() - } -} - /// Memtable memory allocation tracker. #[derive(Default)] pub struct AllocTracker { @@ -205,21 +166,6 @@ impl Drop for AllocTracker { } } -/// Default memtable builder. -#[derive(Debug, Default)] -pub(crate) struct DefaultMemtableBuilder { - /// Next memtable id. - next_id: AtomicU32, -} - -impl MemtableBuilder for DefaultMemtableBuilder { - fn build(&self, _metadata: &RegionMetadataRef) -> MemtableRef { - Arc::new(EmptyMemtable::new( - self.next_id.fetch_add(1, Ordering::Relaxed), - )) - } -} - #[cfg(test)] mod tests { use super::*; diff --git a/src/mito2/src/read/scan_region.rs b/src/mito2/src/read/scan_region.rs index 752821834e..be8b318d83 100644 --- a/src/mito2/src/read/scan_region.rs +++ b/src/mito2/src/read/scan_region.rs @@ -59,6 +59,13 @@ impl Scanner { Scanner::Seq(seq_scan) => seq_scan.num_memtables(), } } + + /// Returns SST file ids to scan. + pub(crate) fn file_ids(&self) -> Vec { + match self { + Scanner::Seq(seq_scan) => seq_scan.file_ids(), + } + } } #[cfg_attr(doc, aquamarine::aquamarine)] diff --git a/src/mito2/src/read/seq_scan.rs b/src/mito2/src/read/seq_scan.rs index 28e5249fec..148cb37771 100644 --- a/src/mito2/src/read/seq_scan.rs +++ b/src/mito2/src/read/seq_scan.rs @@ -156,4 +156,9 @@ impl SeqScan { pub(crate) fn num_files(&self) -> usize { self.files.len() } + + /// Returns SST file ids to scan. + pub(crate) fn file_ids(&self) -> Vec { + self.files.iter().map(|file| file.file_id()).collect() + } } diff --git a/src/mito2/src/request.rs b/src/mito2/src/request.rs index f4b4352cc4..1fb2444690 100644 --- a/src/mito2/src/request.rs +++ b/src/mito2/src/request.rs @@ -42,7 +42,8 @@ use tokio::sync::oneshot::{self, Receiver, Sender}; use crate::config::DEFAULT_WRITE_BUFFER_SIZE; use crate::error::{ - CreateDefaultSnafu, Error, FillDefaultSnafu, FlushRegionSnafu, InvalidRequestSnafu, Result, + CompactRegionSnafu, CreateDefaultSnafu, Error, FillDefaultSnafu, FlushRegionSnafu, + InvalidRequestSnafu, Result, }; use crate::memtable::MemtableId; use crate::sst::file::FileMeta; @@ -661,15 +662,17 @@ pub(crate) struct CompactionFinished { pub(crate) compaction_outputs: Vec, /// Compacted files that are to be removed from region version. pub(crate) compacted_files: Vec, - /// Compaction result sender. - pub(crate) sender: OptionOutputTx, + /// Compaction result senders. + pub(crate) senders: Vec, /// File purger for cleaning files on failure. pub(crate) file_purger: FilePurgerRef, } impl CompactionFinished { pub fn on_success(self) { - self.sender.send(Ok(AffectedRows(0))); + for sender in self.senders { + sender.send(Ok(AffectedRows(0))); + } info!("Successfully compacted region: {}", self.region_id); } } @@ -678,7 +681,12 @@ impl OnFailure for CompactionFinished { /// Compaction succeeded but failed to update manifest or region's already been dropped, /// clean compaction output files. fn on_failure(&mut self, err: Error) { - self.sender.send_mut(Err(err)); + let err = Arc::new(err); + for sender in self.senders.drain(..) { + sender.send(Err(err.clone()).context(CompactRegionSnafu { + region_id: self.region_id, + })); + } for file in &self.compacted_files { let file_id = file.file_id; warn!( diff --git a/src/mito2/src/test_util.rs b/src/mito2/src/test_util.rs index 2f29c8aa6d..43195da18f 100644 --- a/src/mito2/src/test_util.rs +++ b/src/mito2/src/test_util.rs @@ -14,6 +14,10 @@ //! Utilities for testing. +pub mod memtable_util; +pub mod scheduler_util; +pub mod version_util; + use std::collections::HashMap; use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; use std::sync::Arc; diff --git a/src/mito2/src/test_util/memtable_util.rs b/src/mito2/src/test_util/memtable_util.rs new file mode 100644 index 0000000000..4b6c4142e7 --- /dev/null +++ b/src/mito2/src/test_util/memtable_util.rs @@ -0,0 +1,81 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Memtable test utilities. + +use std::sync::atomic::{AtomicU32, Ordering}; +use std::sync::Arc; + +use common_query::logical_plan::Expr; +use store_api::metadata::RegionMetadataRef; +use store_api::storage::ColumnId; + +use crate::error::Result; +use crate::memtable::{ + BoxedBatchIterator, KeyValues, Memtable, MemtableBuilder, MemtableId, MemtableRef, + MemtableStats, +}; + +/// Empty memtable for test. +#[derive(Debug, Default)] +pub(crate) struct EmptyMemtable { + /// Id of this memtable. + id: MemtableId, +} + +impl EmptyMemtable { + /// Returns a new memtable with specific `id`. + pub(crate) fn new(id: MemtableId) -> EmptyMemtable { + EmptyMemtable { id } + } +} + +impl Memtable for EmptyMemtable { + fn id(&self) -> MemtableId { + self.id + } + + fn write(&self, _kvs: &KeyValues) -> Result<()> { + Ok(()) + } + + fn iter(&self, _projection: Option<&[ColumnId]>, _filters: &[Expr]) -> BoxedBatchIterator { + Box::new(std::iter::empty()) + } + + fn is_empty(&self) -> bool { + true + } + + fn mark_immutable(&self) {} + + fn stats(&self) -> MemtableStats { + MemtableStats::default() + } +} + +/// Empty memtable builder. +#[derive(Debug, Default)] +pub(crate) struct EmptyMemtableBuilder { + /// Next memtable id. + next_id: AtomicU32, +} + +impl MemtableBuilder for EmptyMemtableBuilder { + fn build(&self, _metadata: &RegionMetadataRef) -> MemtableRef { + Arc::new(EmptyMemtable::new( + self.next_id.fetch_add(1, Ordering::Relaxed), + )) + } +} diff --git a/src/mito2/src/test_util/scheduler_util.rs b/src/mito2/src/test_util/scheduler_util.rs new file mode 100644 index 0000000000..5e1e259e75 --- /dev/null +++ b/src/mito2/src/test_util/scheduler_util.rs @@ -0,0 +1,77 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Utilities to mock flush and compaction schedulers. + +use std::sync::Arc; + +use common_test_util::temp_dir::{create_temp_dir, TempDir}; +use object_store::services::Fs; +use object_store::ObjectStore; +use tokio::sync::mpsc::Sender; + +use crate::access_layer::{AccessLayer, AccessLayerRef}; +use crate::compaction::CompactionScheduler; +use crate::flush::FlushScheduler; +use crate::request::WorkerRequest; +use crate::schedule::scheduler::{LocalScheduler, SchedulerRef}; + +/// Scheduler mocker. +pub(crate) struct SchedulerEnv { + #[allow(unused)] + path: TempDir, + /// Mock access layer for test. + pub(crate) access_layer: AccessLayerRef, + scheduler: Option, +} + +impl SchedulerEnv { + /// Creates a new mocker. + pub(crate) fn new() -> SchedulerEnv { + let path = create_temp_dir(""); + let mut builder = Fs::default(); + builder.root(path.path().to_str().unwrap()); + let object_store = ObjectStore::new(builder).unwrap().finish(); + let access_layer = Arc::new(AccessLayer::new("", object_store.clone())); + + SchedulerEnv { + path: create_temp_dir(""), + access_layer, + scheduler: None, + } + } + + /// Creates a new compaction scheduler. + pub(crate) fn mock_compaction_scheduler( + &self, + request_sender: Sender, + ) -> CompactionScheduler { + let scheduler = self.get_scheduler(); + + CompactionScheduler::new(scheduler, request_sender) + } + + /// Creates a new flush scheduler. + pub(crate) fn mock_flush_scheduler(&self) -> FlushScheduler { + let scheduler = self.get_scheduler(); + + FlushScheduler::new(scheduler) + } + + fn get_scheduler(&self) -> SchedulerRef { + self.scheduler + .clone() + .unwrap_or_else(|| Arc::new(LocalScheduler::new(1))) + } +} diff --git a/src/mito2/src/test_util/version_util.rs b/src/mito2/src/test_util/version_util.rs new file mode 100644 index 0000000000..2fee8a987b --- /dev/null +++ b/src/mito2/src/test_util/version_util.rs @@ -0,0 +1,115 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Utilities to mock version. + +use std::collections::HashMap; +use std::sync::Arc; + +use api::v1::SemanticType; +use common_time::Timestamp; +use datatypes::prelude::ConcreteDataType; +use datatypes::schema::ColumnSchema; +use store_api::metadata::{ColumnMetadata, RegionMetadata, RegionMetadataBuilder}; +use store_api::storage::RegionId; + +use crate::memtable::{MemtableBuilder, MemtableBuilderRef}; +use crate::region::version::{Version, VersionBuilder, VersionControl}; +use crate::sst::file::{FileId, FileMeta}; +use crate::sst::file_purger::FilePurgerRef; +use crate::test_util::memtable_util::EmptyMemtableBuilder; +use crate::test_util::new_noop_file_purger; + +fn new_region_metadata(region_id: RegionId) -> RegionMetadata { + let mut builder = RegionMetadataBuilder::new(region_id); + builder + .push_column_metadata(ColumnMetadata { + column_schema: ColumnSchema::new( + "ts", + ConcreteDataType::timestamp_millisecond_datatype(), + false, + ), + semantic_type: SemanticType::Timestamp, + column_id: 1, + }) + .push_column_metadata(ColumnMetadata { + column_schema: ColumnSchema::new("tag_0", ConcreteDataType::string_datatype(), true), + semantic_type: SemanticType::Tag, + column_id: 2, + }) + .primary_key(vec![2]); + builder.build().unwrap() +} + +// Builder to mock a version control. +pub(crate) struct VersionControlBuilder { + metadata: RegionMetadata, + file_purger: FilePurgerRef, + memtable_builder: Arc, + files: HashMap, +} + +impl VersionControlBuilder { + pub(crate) fn new() -> VersionControlBuilder { + VersionControlBuilder { + metadata: new_region_metadata(RegionId::new(1, 1)), + file_purger: new_noop_file_purger(), + memtable_builder: Arc::new(EmptyMemtableBuilder::default()), + files: HashMap::new(), + } + } + + pub(crate) fn region_id(&self) -> RegionId { + self.metadata.region_id + } + + pub(crate) fn file_purger(&self) -> FilePurgerRef { + self.file_purger.clone() + } + + pub(crate) fn memtable_builder(&self) -> MemtableBuilderRef { + self.memtable_builder.clone() + } + + pub(crate) fn push_l0_file(&mut self, start_ms: i64, end_ms: i64) -> &mut Self { + let file_id = FileId::random(); + self.files.insert( + file_id, + FileMeta { + region_id: self.metadata.region_id, + file_id, + time_range: ( + Timestamp::new_millisecond(start_ms), + Timestamp::new_millisecond(end_ms), + ), + level: 0, + file_size: 0, // We don't care file size. + }, + ); + self + } + + pub(crate) fn build_version(&self) -> Version { + let metadata = Arc::new(self.metadata.clone()); + let mutable = self.memtable_builder.build(&metadata); + VersionBuilder::new(metadata, mutable) + .add_files(self.file_purger.clone(), self.files.values().cloned()) + .build() + } + + pub(crate) fn build(&self) -> VersionControl { + let version = self.build_version(); + VersionControl::new(version) + } +} diff --git a/src/mito2/src/worker.rs b/src/mito2/src/worker.rs index 3cef393f7c..2f72724fd8 100644 --- a/src/mito2/src/worker.rs +++ b/src/mito2/src/worker.rs @@ -251,7 +251,7 @@ impl WorkerStarter { scheduler: self.scheduler.clone(), write_buffer_manager: self.write_buffer_manager, flush_scheduler: FlushScheduler::new(self.scheduler.clone()), - compaction_scheduler: CompactionScheduler::new(self.scheduler), + compaction_scheduler: CompactionScheduler::new(self.scheduler, sender.clone()), stalled_requests: StalledRequests::default(), listener: self.listener, }; diff --git a/src/mito2/src/worker/handle_alter.rs b/src/mito2/src/worker/handle_alter.rs index 38813d324c..61cb343e80 100644 --- a/src/mito2/src/worker/handle_alter.rs +++ b/src/mito2/src/worker/handle_alter.rs @@ -55,7 +55,10 @@ impl RegionWorkerLoop { // Try to submit a flush task. let task = self.new_flush_task(®ion, FlushReason::Alter); - if let Err(e) = self.flush_scheduler.schedule_flush(®ion, task) { + if let Err(e) = + self.flush_scheduler + .schedule_flush(region.region_id, ®ion.version_control, task) + { // Unable to flush the region, send error to waiter. sender.send(Err(e)); return; diff --git a/src/mito2/src/worker/handle_close.rs b/src/mito2/src/worker/handle_close.rs index 03899dd59f..2d786dd9cb 100644 --- a/src/mito2/src/worker/handle_close.rs +++ b/src/mito2/src/worker/handle_close.rs @@ -33,6 +33,8 @@ impl RegionWorkerLoop { self.regions.remove_region(region_id); // Clean flush status. self.flush_scheduler.on_region_closed(region_id); + // Clean compaction status. + self.compaction_scheduler.on_region_closed(region_id); info!("Region {} closed", region_id); diff --git a/src/mito2/src/worker/handle_compaction.rs b/src/mito2/src/worker/handle_compaction.rs index 101e9011eb..0e0f3a07ef 100644 --- a/src/mito2/src/worker/handle_compaction.rs +++ b/src/mito2/src/worker/handle_compaction.rs @@ -16,9 +16,7 @@ use common_telemetry::{error, info}; use store_api::logstore::LogStore; use store_api::storage::RegionId; -use crate::compaction::CompactionRequest; use crate::manifest::action::{RegionEdit, RegionMetaAction, RegionMetaActionList}; -use crate::region::MitoRegionRef; use crate::request::{CompactionFailed, CompactionFinished, OnFailure, OptionOutputTx}; use crate::worker::RegionWorkerLoop; @@ -33,8 +31,13 @@ impl RegionWorkerLoop { return; }; - let request = self.new_compaction_request(®ion, sender); - if let Err(e) = self.compaction_scheduler.schedule_compaction(request) { + if let Err(e) = self.compaction_scheduler.schedule_compaction( + region.region_id, + ®ion.version_control, + ®ion.access_layer, + ®ion.file_purger, + sender, + ) { error!(e; "Failed to schedule compaction task for region: {}", region_id); } else { info!( @@ -74,31 +77,16 @@ impl RegionWorkerLoop { .version_control .apply_edit(edit, &[], region.file_purger.clone()); request.on_success(); + + // Schedule next compaction if necessary. + self.compaction_scheduler.on_compaction_finished(region_id); } /// When compaction fails, we simply log the error. pub(crate) async fn handle_compaction_failure(&mut self, req: CompactionFailed) { error!(req.err; "Failed to compact region: {}", req.region_id); - } - /// Creates a new compaction request. - fn new_compaction_request( - &self, - region: &MitoRegionRef, - waiter: OptionOutputTx, - ) -> CompactionRequest { - let current_version = region.version_control.current().version; - let access_layer = region.access_layer.clone(); - let file_purger = region.file_purger.clone(); - - CompactionRequest { - current_version, - access_layer, - ttl: None, // TODO(hl): get TTL info from region metadata - compaction_time_window: None, // TODO(hl): get persisted region compaction time window - request_sender: self.sender.clone(), - waiter, - file_purger, - } + self.compaction_scheduler + .on_compaction_failed(req.region_id, req.err); } } diff --git a/src/mito2/src/worker/handle_drop.rs b/src/mito2/src/worker/handle_drop.rs index cfb4aca89f..f7aa5d15dc 100644 --- a/src/mito2/src/worker/handle_drop.rs +++ b/src/mito2/src/worker/handle_drop.rs @@ -52,6 +52,8 @@ impl RegionWorkerLoop { self.dropping_regions.insert_region(region.clone()); // Notifies flush scheduler. self.flush_scheduler.on_region_dropped(region_id); + // Notifies compaction scheduler. + self.compaction_scheduler.on_region_dropped(region_id); // mark region version as dropped region.version_control.mark_dropped(); diff --git a/src/mito2/src/worker/handle_flush.rs b/src/mito2/src/worker/handle_flush.rs index cb2f2746dc..2c1bad5cf5 100644 --- a/src/mito2/src/worker/handle_flush.rs +++ b/src/mito2/src/worker/handle_flush.rs @@ -14,12 +14,12 @@ //! Handling flush related requests. -use common_telemetry::{error, info}; +use common_telemetry::{error, info, warn}; use common_time::util::current_time_millis; use store_api::logstore::LogStore; use store_api::storage::RegionId; -use crate::error::{RegionTruncatingSnafu, Result}; +use crate::error::{RegionTruncatedSnafu, Result}; use crate::flush::{FlushReason, RegionFlushTask}; use crate::manifest::action::{RegionEdit, RegionMetaAction, RegionMetaActionList}; use crate::region::MitoRegionRef; @@ -39,7 +39,10 @@ impl RegionWorkerLoop { let mut task = self.new_flush_task(®ion, FlushReason::Manual); task.push_sender(sender); - if let Err(e) = self.flush_scheduler.schedule_flush(®ion, task) { + if let Err(e) = + self.flush_scheduler + .schedule_flush(region.region_id, ®ion.version_control, task) + { error!(e; "Failed to schedule flush task for region {}", region.region_id); } } @@ -90,7 +93,11 @@ impl RegionWorkerLoop { if region.last_flush_millis() < min_last_flush_time { // If flush time of this region is earlier than `min_last_flush_time`, we can flush this region. let task = self.new_flush_task(region, FlushReason::EngineFull); - self.flush_scheduler.schedule_flush(region, task)?; + self.flush_scheduler.schedule_flush( + region.region_id, + ®ion.version_control, + task, + )?; } } @@ -99,7 +106,11 @@ impl RegionWorkerLoop { if let Some(region) = max_mem_region { if !self.flush_scheduler.is_flush_requested(region.region_id) { let task = self.new_flush_task(region, FlushReason::EngineFull); - self.flush_scheduler.schedule_flush(region, task)?; + self.flush_scheduler.schedule_flush( + region.region_id, + ®ion.version_control, + task, + )?; } } @@ -141,7 +152,7 @@ impl RegionWorkerLoop { let version_data = region.version_control.current(); if let Some(truncated_entry_id) = version_data.version.truncated_entry_id { if truncated_entry_id >= request.flushed_entry_id { - request.on_failure(RegionTruncatingSnafu { region_id }.build()); + request.on_failure(RegionTruncatedSnafu { region_id }.build()); return; } } @@ -198,6 +209,20 @@ impl RegionWorkerLoop { // We already stalled these requests, don't stall them again. self.handle_write_requests(stalled.requests, false).await; + // Schedules compaction. + if let Err(e) = self.compaction_scheduler.schedule_compaction( + region.region_id, + ®ion.version_control, + ®ion.access_layer, + ®ion.file_purger, + OptionOutputTx::none(), + ) { + warn!( + "Failed to schedule compaction after flush, region: {}, err: {}", + region.region_id, e + ); + } + self.listener.on_flush_success(region_id); } } diff --git a/src/mito2/src/worker/handle_truncate.rs b/src/mito2/src/worker/handle_truncate.rs index 759f17b90c..5b1e9db8a9 100644 --- a/src/mito2/src/worker/handle_truncate.rs +++ b/src/mito2/src/worker/handle_truncate.rs @@ -44,8 +44,9 @@ impl RegionWorkerLoop { region.manifest_manager.update(action_list).await?; // Notifies flush scheduler. - self.flush_scheduler.on_region_truncating(region_id); - // TODO(DevilExileSu): Notifies compaction scheduler. + self.flush_scheduler.on_region_truncated(region_id); + // Notifies compaction scheduler. + self.compaction_scheduler.on_region_truncated(region_id); // Reset region's version and mark all SSTs deleted. region.version_control.truncate(