From 08e84ea331e3ca4c36466fbcffcfd226875f8ef0 Mon Sep 17 00:00:00 2001 From: luofucong Date: Tue, 26 May 2026 15:41:53 +0800 Subject: [PATCH] last schedule compaction time Signed-off-by: luofucong --- src/mito2/src/region.rs | 21 +++++++++++---------- src/mito2/src/region/opener.rs | 4 ++-- src/mito2/src/worker/handle_compaction.rs | 17 +++++++++-------- 3 files changed, 22 insertions(+), 20 deletions(-) diff --git a/src/mito2/src/region.rs b/src/mito2/src/region.rs index c85599bf58..c51432fc09 100644 --- a/src/mito2/src/region.rs +++ b/src/mito2/src/region.rs @@ -140,8 +140,8 @@ pub struct MitoRegion { pub(crate) provider: Provider, /// Last flush time in millis. last_flush_millis: AtomicI64, - /// Last compaction time in millis. - last_compaction_millis: AtomicI64, + /// Last schedule compaction time in millis. + last_schedule_compaction_millis: AtomicI64, /// Provider to get current time. time_provider: TimeProviderRef, /// The topic's latest entry id since the region's last flushing. @@ -234,15 +234,16 @@ impl MitoRegion { self.last_flush_millis.store(now, Ordering::Relaxed); } - /// Returns last compaction timestamp in millis. - pub(crate) fn last_compaction_millis(&self) -> i64 { - self.last_compaction_millis.load(Ordering::Relaxed) + /// Returns last schedule compaction timestamp in millis. + pub(crate) fn last_schedule_compaction_millis(&self) -> i64 { + self.last_schedule_compaction_millis.load(Ordering::Relaxed) } - /// Update compaction time to current time. - pub(crate) fn update_compaction_millis(&self) { + /// Update schedule compaction time to current time. + pub(crate) fn update_schedule_compaction_millis(&self) { let now = self.time_provider.current_time_millis(); - self.last_compaction_millis.store(now, Ordering::Relaxed); + self.last_schedule_compaction_millis + .store(now, Ordering::Relaxed); } /// Returns the table dir. @@ -1658,7 +1659,7 @@ mod tests { file_purger: crate::test_util::new_noop_file_purger(), provider: Provider::noop_provider(), last_flush_millis: Default::default(), - last_compaction_millis: Default::default(), + last_schedule_compaction_millis: Default::default(), time_provider: Arc::new(StdTimeProvider), topic_latest_entry_id: Default::default(), written_bytes: Arc::new(AtomicU64::new(0)), @@ -2015,7 +2016,7 @@ mod tests { file_purger: crate::test_util::new_noop_file_purger(), provider: Provider::noop_provider(), last_flush_millis: Default::default(), - last_compaction_millis: Default::default(), + last_schedule_compaction_millis: Default::default(), time_provider: Arc::new(StdTimeProvider), topic_latest_entry_id: Default::default(), written_bytes: Arc::new(AtomicU64::new(0)), diff --git a/src/mito2/src/region/opener.rs b/src/mito2/src/region/opener.rs index 6785181b48..16aed588fe 100644 --- a/src/mito2/src/region/opener.rs +++ b/src/mito2/src/region/opener.rs @@ -345,7 +345,7 @@ impl RegionOpener { ), provider, last_flush_millis: AtomicI64::new(now), - last_compaction_millis: AtomicI64::new(now), + last_schedule_compaction_millis: AtomicI64::new(0), time_provider: self.time_provider.clone(), topic_latest_entry_id: AtomicU64::new(0), written_bytes: Arc::new(AtomicU64::new(0)), @@ -581,7 +581,7 @@ impl RegionOpener { file_purger, provider: provider.clone(), last_flush_millis: AtomicI64::new(now), - last_compaction_millis: AtomicI64::new(now), + last_schedule_compaction_millis: AtomicI64::new(0), time_provider: self.time_provider.clone(), topic_latest_entry_id: AtomicU64::new(topic_latest_entry_id), written_bytes: Arc::new(AtomicU64::new(0)), diff --git a/src/mito2/src/worker/handle_compaction.rs b/src/mito2/src/worker/handle_compaction.rs index fd021771b1..05623ae7df 100644 --- a/src/mito2/src/worker/handle_compaction.rs +++ b/src/mito2/src/worker/handle_compaction.rs @@ -80,7 +80,6 @@ impl RegionWorkerLoop { return; } }; - region.update_compaction_millis(); region.version_control.apply_edit( Some(request.edit.clone()), @@ -160,9 +159,10 @@ impl RegionWorkerLoop { return; } let now = self.time_provider.current_time_millis(); - if now - region.last_compaction_millis() + if now - region.last_schedule_compaction_millis() >= self.config.min_compaction_interval.as_millis() as i64 - && let Err(e) = self + { + match self .compaction_scheduler .schedule_compaction( region.region_id, @@ -175,11 +175,12 @@ impl RegionWorkerLoop { 1, // Default for automatic compaction ) .await - { - warn!( - "Failed to schedule compaction for region: {}, err: {}", - region.region_id, e - ); + { + Ok(()) => region.update_schedule_compaction_millis(), + Err(e) => { + error!(e; "Failed to schedule compaction for region: {}", region.region_id) + } + } } } }