mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-07-03 20:40:37 +00:00
last schedule compaction time
Signed-off-by: luofucong <luofc@foxmail.com>
This commit is contained in:
@@ -140,8 +140,8 @@ pub struct MitoRegion {
|
||||
pub(crate) provider: Provider,
|
||||
/// Last flush time in millis.
|
||||
last_flush_millis: AtomicI64,
|
||||
/// Last compaction time in millis.
|
||||
last_compaction_millis: AtomicI64,
|
||||
/// Last schedule compaction time in millis.
|
||||
last_schedule_compaction_millis: AtomicI64,
|
||||
/// Provider to get current time.
|
||||
time_provider: TimeProviderRef,
|
||||
/// The topic's latest entry id since the region's last flushing.
|
||||
@@ -234,15 +234,16 @@ impl MitoRegion {
|
||||
self.last_flush_millis.store(now, Ordering::Relaxed);
|
||||
}
|
||||
|
||||
/// Returns last compaction timestamp in millis.
|
||||
pub(crate) fn last_compaction_millis(&self) -> i64 {
|
||||
self.last_compaction_millis.load(Ordering::Relaxed)
|
||||
/// Returns last schedule compaction timestamp in millis.
|
||||
pub(crate) fn last_schedule_compaction_millis(&self) -> i64 {
|
||||
self.last_schedule_compaction_millis.load(Ordering::Relaxed)
|
||||
}
|
||||
|
||||
/// Update compaction time to current time.
|
||||
pub(crate) fn update_compaction_millis(&self) {
|
||||
/// Update schedule compaction time to current time.
|
||||
pub(crate) fn update_schedule_compaction_millis(&self) {
|
||||
let now = self.time_provider.current_time_millis();
|
||||
self.last_compaction_millis.store(now, Ordering::Relaxed);
|
||||
self.last_schedule_compaction_millis
|
||||
.store(now, Ordering::Relaxed);
|
||||
}
|
||||
|
||||
/// Returns the table dir.
|
||||
@@ -1658,7 +1659,7 @@ mod tests {
|
||||
file_purger: crate::test_util::new_noop_file_purger(),
|
||||
provider: Provider::noop_provider(),
|
||||
last_flush_millis: Default::default(),
|
||||
last_compaction_millis: Default::default(),
|
||||
last_schedule_compaction_millis: Default::default(),
|
||||
time_provider: Arc::new(StdTimeProvider),
|
||||
topic_latest_entry_id: Default::default(),
|
||||
written_bytes: Arc::new(AtomicU64::new(0)),
|
||||
@@ -2015,7 +2016,7 @@ mod tests {
|
||||
file_purger: crate::test_util::new_noop_file_purger(),
|
||||
provider: Provider::noop_provider(),
|
||||
last_flush_millis: Default::default(),
|
||||
last_compaction_millis: Default::default(),
|
||||
last_schedule_compaction_millis: Default::default(),
|
||||
time_provider: Arc::new(StdTimeProvider),
|
||||
topic_latest_entry_id: Default::default(),
|
||||
written_bytes: Arc::new(AtomicU64::new(0)),
|
||||
|
||||
@@ -345,7 +345,7 @@ impl RegionOpener {
|
||||
),
|
||||
provider,
|
||||
last_flush_millis: AtomicI64::new(now),
|
||||
last_compaction_millis: AtomicI64::new(now),
|
||||
last_schedule_compaction_millis: AtomicI64::new(0),
|
||||
time_provider: self.time_provider.clone(),
|
||||
topic_latest_entry_id: AtomicU64::new(0),
|
||||
written_bytes: Arc::new(AtomicU64::new(0)),
|
||||
@@ -581,7 +581,7 @@ impl RegionOpener {
|
||||
file_purger,
|
||||
provider: provider.clone(),
|
||||
last_flush_millis: AtomicI64::new(now),
|
||||
last_compaction_millis: AtomicI64::new(now),
|
||||
last_schedule_compaction_millis: AtomicI64::new(0),
|
||||
time_provider: self.time_provider.clone(),
|
||||
topic_latest_entry_id: AtomicU64::new(topic_latest_entry_id),
|
||||
written_bytes: Arc::new(AtomicU64::new(0)),
|
||||
|
||||
@@ -80,7 +80,6 @@ impl<S> RegionWorkerLoop<S> {
|
||||
return;
|
||||
}
|
||||
};
|
||||
region.update_compaction_millis();
|
||||
|
||||
region.version_control.apply_edit(
|
||||
Some(request.edit.clone()),
|
||||
@@ -160,9 +159,10 @@ impl<S> RegionWorkerLoop<S> {
|
||||
return;
|
||||
}
|
||||
let now = self.time_provider.current_time_millis();
|
||||
if now - region.last_compaction_millis()
|
||||
if now - region.last_schedule_compaction_millis()
|
||||
>= self.config.min_compaction_interval.as_millis() as i64
|
||||
&& let Err(e) = self
|
||||
{
|
||||
match self
|
||||
.compaction_scheduler
|
||||
.schedule_compaction(
|
||||
region.region_id,
|
||||
@@ -175,11 +175,12 @@ impl<S> RegionWorkerLoop<S> {
|
||||
1, // Default for automatic compaction
|
||||
)
|
||||
.await
|
||||
{
|
||||
warn!(
|
||||
"Failed to schedule compaction for region: {}, err: {}",
|
||||
region.region_id, e
|
||||
);
|
||||
{
|
||||
Ok(()) => region.update_schedule_compaction_millis(),
|
||||
Err(e) => {
|
||||
error!(e; "Failed to schedule compaction for region: {}", region.region_id)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user