refactor: use last compaction scheduled time instead of finished time (#8190)

* refactor: use last compaction scheduled time instead of finished time

Signed-off-by: luofucong <luofc@foxmail.com>

* add some logs for debugging

Signed-off-by: luofucong <luofc@foxmail.com>

---------

Signed-off-by: luofucong <luofc@foxmail.com>
This commit is contained in:
LFC
2026-06-01 15:45:38 +08:00
committed by GitHub
parent 28fd796f4e
commit 33ffa41ac2
5 changed files with 201 additions and 50 deletions

View File

@@ -150,6 +150,7 @@ impl CompactionScheduler {
}
/// Schedules a compaction for the region.
/// Returns whether a compaction is scheduled.
#[allow(clippy::too_many_arguments)]
pub(crate) async fn schedule_compaction(
&mut self,
@@ -161,7 +162,7 @@ impl CompactionScheduler {
manifest_ctx: &ManifestContextRef,
schema_metadata_manager: SchemaMetadataManagerRef,
max_parallelism: usize,
) -> Result<()> {
) -> Result<bool> {
// skip compaction if region is in staging state
let current_state = manifest_ctx.current_state();
if current_state == RegionRoleState::Leader(RegionLeaderState::Staging) {
@@ -170,7 +171,7 @@ impl CompactionScheduler {
region_id, compact_options
);
waiter.send(Ok(0));
return Ok(());
return Ok(false);
}
if let Some(status) = self.region_status.get_mut(&region_id) {
@@ -192,7 +193,7 @@ impl CompactionScheduler {
);
}
}
return Ok(());
return Ok(false);
}
// The region can compact directly.
@@ -209,7 +210,7 @@ impl CompactionScheduler {
max_parallelism,
);
let result = match self
match self
.schedule_compaction_request(request, compact_options)
.await
{
@@ -220,14 +221,12 @@ impl CompactionScheduler {
status.active_compaction = Some(active_compaction);
self.region_status.insert(region_id, status);
Ok(())
self.listener.on_compaction_scheduled(region_id);
Ok(true)
}
Ok(None) => Ok(()),
Ok(None) => Ok(false),
Err(e) => Err(e),
};
self.listener.on_compaction_scheduled(region_id);
result
}
}
// Handle pending manual compaction request for the region.
@@ -334,6 +333,27 @@ impl CompactionScheduler {
// And skip try to schedule next compaction task.
return pending_ddl_requests;
}
Vec::new()
}
pub(crate) fn is_compacting(&self, region_id: RegionId) -> bool {
self.region_status
.get(&region_id)
.map(|status| status.active_compaction.is_some())
.unwrap_or(false)
}
/// Schedules next compaction upon a finished compaction.
/// Returns whether the compaction is scheduled.
pub(crate) async fn schedule_next_compaction(
&mut self,
region_id: RegionId,
manifest_ctx: &ManifestContextRef,
schema_metadata_manager: SchemaMetadataManagerRef,
) -> bool {
let Some(status) = self.region_status.get_mut(&region_id) else {
return false;
};
// We should always try to compact the region until picker returns None.
let request = status.new_compaction_request(
@@ -364,20 +384,21 @@ impl CompactionScheduler {
"Successfully scheduled next compaction for region id: {}",
region_id
);
true
}
Ok(None) => {
// No further compaction tasks can be scheduled; cleanup the `CompactionStatus` for this region.
// All DDL requests and pending compaction requests have already been processed.
// Safe to remove the region from status tracking.
self.region_status.remove(&region_id);
false
}
Err(e) => {
error!(e; "Failed to schedule next compaction for region {}", region_id);
self.remove_region_on_failure(region_id, Arc::new(e));
false
}
}
Vec::new()
}
/// Notifies the scheduler that the compaction job is cancelled cooperatively.
@@ -1434,7 +1455,7 @@ mod tests {
let manifest_ctx = env
.mock_manifest_context(version_control.current().version.metadata.clone())
.await;
scheduler
let scheduled = scheduler
.schedule_compaction(
builder.region_id(),
compact_request::Options::Regular(Default::default()),
@@ -1447,6 +1468,7 @@ mod tests {
)
.await
.unwrap();
assert!(!scheduled);
let output = output_rx.await.unwrap().unwrap();
assert_eq!(output, 0);
assert!(scheduler.region_status.is_empty());
@@ -1455,7 +1477,7 @@ mod tests {
let version_control = Arc::new(builder.push_l0_file(0, 1000).build());
let (output_tx, output_rx) = oneshot::channel();
let waiter = OptionOutputTx::from(output_tx);
scheduler
let scheduled = scheduler
.schedule_compaction(
builder.region_id(),
compact_request::Options::Regular(Default::default()),
@@ -1468,11 +1490,67 @@ mod tests {
)
.await
.unwrap();
assert!(!scheduled);
let output = output_rx.await.unwrap().unwrap();
assert_eq!(output, 0);
assert!(scheduler.region_status.is_empty());
}
#[tokio::test]
async fn test_schedule_compaction_returns_true_when_task_scheduled() {
let job_scheduler = Arc::new(VecScheduler::default());
let env = SchedulerEnv::new().await.scheduler(job_scheduler.clone());
let (tx, _rx) = mpsc::channel(4);
let mut scheduler = env.mock_compaction_scheduler(tx);
let mut builder = VersionControlBuilder::new();
let region_id = builder.region_id();
let end = 1000 * 1000;
// Five overlapping L0 files are enough for the regular picker to create a task.
let version_control = Arc::new(
builder
.push_l0_file(0, end)
.push_l0_file(10, end)
.push_l0_file(50, end)
.push_l0_file(80, end)
.push_l0_file(90, end)
.build(),
);
let manifest_ctx = env
.mock_manifest_context(version_control.current().version.metadata.clone())
.await;
let (schema_metadata_manager, kv_backend) = mock_schema_metadata_manager();
schema_metadata_manager
.register_region_table_info(
region_id.table_id(),
"test_table",
"test_catalog",
"test_schema",
None,
kv_backend,
)
.await;
let scheduled = scheduler
.schedule_compaction(
region_id,
Options::Regular(Default::default()),
&version_control,
&env.access_layer,
OptionOutputTx::none(),
&manifest_ctx,
schema_metadata_manager,
1,
)
.await
.unwrap();
// The boolean result is what the worker uses to decide whether to update
// last_schedule_compaction_millis.
assert!(scheduled);
assert_eq!(1, job_scheduler.num_jobs());
assert!(scheduler.region_status.contains_key(&region_id));
}
#[tokio::test]
async fn test_schedule_on_finished() {
common_telemetry::init_default_ut_logging();
@@ -1510,7 +1588,7 @@ mod tests {
let manifest_ctx = env
.mock_manifest_context(version_control.current().version.metadata.clone())
.await;
scheduler
let scheduled = scheduler
.schedule_compaction(
region_id,
compact_request::Options::Regular(Default::default()),
@@ -1524,6 +1602,7 @@ mod tests {
.await
.unwrap();
// Should schedule 1 compaction.
assert!(scheduled);
assert_eq!(1, scheduler.region_status.len());
assert_eq!(1, job_scheduler.num_jobs());
let data = version_control.current();
@@ -1542,7 +1621,7 @@ mod tests {
);
// The task is pending.
let (tx, _rx) = oneshot::channel();
scheduler
let scheduled = scheduler
.schedule_compaction(
region_id,
compact_request::Options::Regular(Default::default()),
@@ -1555,6 +1634,7 @@ mod tests {
)
.await
.unwrap();
assert!(!scheduled);
assert_eq!(1, scheduler.region_status.len());
assert_eq!(1, job_scheduler.num_jobs());
assert!(
@@ -1570,6 +1650,10 @@ mod tests {
scheduler
.on_compaction_finished(region_id, &manifest_ctx, schema_metadata_manager.clone())
.await;
let scheduled = scheduler
.schedule_next_compaction(region_id, &manifest_ctx, schema_metadata_manager.clone())
.await;
assert!(scheduled);
assert_eq!(1, scheduler.region_status.len());
assert_eq!(2, job_scheduler.num_jobs());
@@ -1582,7 +1666,7 @@ mod tests {
);
let (tx, _rx) = oneshot::channel();
// The task is pending.
scheduler
let scheduled = scheduler
.schedule_compaction(
region_id,
compact_request::Options::Regular(Default::default()),
@@ -1595,6 +1679,7 @@ mod tests {
)
.await
.unwrap();
assert!(!scheduled);
assert_eq!(2, job_scheduler.num_jobs());
assert!(
!scheduler
@@ -2328,6 +2413,15 @@ mod tests {
.await;
assert!(pending_ddls.is_empty());
assert!(scheduler.region_status.contains_key(&region_id));
let (schema_metadata_manager, _kv_backend) = mock_schema_metadata_manager();
// With no compactable files, next scheduling returns false and removes
// the status without creating a background task.
let scheduled = scheduler
.schedule_next_compaction(region_id, &manifest_ctx, schema_metadata_manager)
.await;
assert!(!scheduled);
assert!(!scheduler.region_status.contains_key(&region_id));
}
@@ -2370,6 +2464,14 @@ mod tests {
.await;
assert!(pending_ddls.is_empty());
assert!(scheduler.region_status.contains_key(&region_id));
let (schema_metadata_manager, _kv_backend) = mock_schema_metadata_manager();
// The failing scheduler simulates a submit error; callers must see false.
let scheduled = scheduler
.schedule_next_compaction(region_id, &manifest_ctx, schema_metadata_manager)
.await;
assert!(!scheduled);
assert!(!scheduler.region_status.contains_key(&region_id));
}

View File

@@ -21,6 +21,7 @@ use common_error::ext::ErrorExt;
use common_error::status_code::StatusCode;
use common_recordbatch::DfRecordBatch;
use common_test_util::flight::encode_to_flight_data;
use common_time::Timestamp;
use common_time::util::current_time_millis;
use datatypes::arrow::array::{ArrayRef, Float64Array, StringArray, TimestampMillisecondArray};
use datatypes::arrow::datatypes::{DataType, Field, Schema, TimeUnit};
@@ -67,7 +68,8 @@ async fn test_edit_region_schedule_compaction_with_format(flat_format: bool) {
default_flat_format: flat_format,
..Default::default()
};
let time_provider = Arc::new(MockTimeProvider::new(current_time_millis()));
let initial_time = current_time_millis();
let time_provider = Arc::new(MockTimeProvider::new(initial_time));
let engine = env
.create_engine_with_time(
config.clone(),
@@ -99,14 +101,22 @@ async fn test_edit_region_schedule_compaction_with_format(flat_format: bool) {
.await
.unwrap();
let region = engine.get_region(region_id).unwrap();
let initial_schedule_time = region.last_schedule_compaction_millis();
assert_eq!(initial_time, initial_schedule_time);
let new_edit = || RegionEdit {
files_to_add: vec![FileMeta {
region_id: region.region_id,
file_id: FileId::random(),
level: 0,
..Default::default()
}],
let new_edit = |file_starts: &[i64]| RegionEdit {
files_to_add: file_starts
.iter()
.map(|start| FileMeta {
region_id: region.region_id,
file_id: FileId::random(),
time_range: (
Timestamp::new_millisecond(*start),
Timestamp::new_millisecond(1000 * 1000),
),
..Default::default()
})
.collect(),
files_to_remove: vec![],
timestamp_ms: None,
compaction_time_window: None,
@@ -115,19 +125,23 @@ async fn test_edit_region_schedule_compaction_with_format(flat_format: bool) {
committed_sequence: None,
};
engine
.edit_region(region.region_id, new_edit())
.edit_region(region.region_id, new_edit(&[0, 10, 50, 80]))
.await
.unwrap();
// Asserts that the compaction of the region is not scheduled,
// because the minimum time interval between two compactions is not passed.
assert_eq!(rx.try_recv(), Err(oneshot::error::TryRecvError::Empty));
assert_eq!(
initial_schedule_time,
region.last_schedule_compaction_millis()
);
// Simulates the time has passed the min compaction interval,
time_provider
.set_now(current_time_millis() + config.min_compaction_interval.as_millis() as i64);
let next_schedule_time = initial_time + config.min_compaction_interval.as_millis() as i64;
time_provider.set_now(next_schedule_time);
// ... then edits the region again,
engine
.edit_region(region.region_id, new_edit())
.edit_region(region.region_id, new_edit(&[90]))
.await
.unwrap();
// ... finally asserts that the compaction of the region is scheduled.
@@ -136,6 +150,9 @@ async fn test_edit_region_schedule_compaction_with_format(flat_format: bool) {
.unwrap()
.unwrap();
assert_eq!(region_id, actual);
// Wait for the `last_schedule_compaction_millis` to update.
tokio::time::sleep(Duration::from_millis(100)).await;
assert_eq!(next_schedule_time, region.last_schedule_compaction_millis());
}
#[tokio::test]

View File

@@ -157,8 +157,8 @@ pub struct MitoRegion {
pub(crate) provider: Provider,
/// Last flush time in millis.
last_flush_millis: AtomicI64,
/// Last compaction time in millis.
last_compaction_millis: AtomicI64,
/// Last schedule compaction time in millis.
last_schedule_compaction_millis: AtomicI64,
/// Provider to get current time.
time_provider: TimeProviderRef,
/// The topic's latest entry id since the region's last flushing.
@@ -251,15 +251,16 @@ impl MitoRegion {
self.last_flush_millis.store(now, Ordering::Relaxed);
}
/// Returns last compaction timestamp in millis.
pub(crate) fn last_compaction_millis(&self) -> i64 {
self.last_compaction_millis.load(Ordering::Relaxed)
/// Returns last schedule compaction timestamp in millis.
pub(crate) fn last_schedule_compaction_millis(&self) -> i64 {
self.last_schedule_compaction_millis.load(Ordering::Relaxed)
}
/// Update compaction time to current time.
pub(crate) fn update_compaction_millis(&self) {
/// Update schedule compaction time to current time.
pub(crate) fn update_schedule_compaction_millis(&self) {
let now = self.time_provider.current_time_millis();
self.last_compaction_millis.store(now, Ordering::Relaxed);
self.last_schedule_compaction_millis
.store(now, Ordering::Relaxed);
}
/// Returns the table dir.
@@ -1727,7 +1728,7 @@ mod tests {
file_purger: crate::test_util::new_noop_file_purger(),
provider: Provider::noop_provider(),
last_flush_millis: Default::default(),
last_compaction_millis: Default::default(),
last_schedule_compaction_millis: Default::default(),
time_provider: Arc::new(StdTimeProvider),
topic_latest_entry_id: Default::default(),
written_bytes: Arc::new(AtomicU64::new(0)),
@@ -2084,7 +2085,7 @@ mod tests {
file_purger: crate::test_util::new_noop_file_purger(),
provider: Provider::noop_provider(),
last_flush_millis: Default::default(),
last_compaction_millis: Default::default(),
last_schedule_compaction_millis: Default::default(),
time_provider: Arc::new(StdTimeProvider),
topic_latest_entry_id: Default::default(),
written_bytes: Arc::new(AtomicU64::new(0)),

View File

@@ -345,7 +345,7 @@ impl RegionOpener {
),
provider,
last_flush_millis: AtomicI64::new(now),
last_compaction_millis: AtomicI64::new(now),
last_schedule_compaction_millis: AtomicI64::new(now),
time_provider: self.time_provider.clone(),
topic_latest_entry_id: AtomicU64::new(0),
written_bytes: Arc::new(AtomicU64::new(0)),
@@ -581,7 +581,7 @@ impl RegionOpener {
file_purger,
provider: provider.clone(),
last_flush_millis: AtomicI64::new(now),
last_compaction_millis: AtomicI64::new(now),
last_schedule_compaction_millis: AtomicI64::new(now),
time_provider: self.time_provider.clone(),
topic_latest_entry_id: AtomicU64::new(topic_latest_entry_id),
written_bytes: Arc::new(AtomicU64::new(0)),

View File

@@ -13,7 +13,7 @@
// limitations under the License.
use api::v1::region::compact_request;
use common_telemetry::{error, info, warn};
use common_telemetry::{debug, error, info};
use store_api::logstore::LogStore;
use store_api::region_request::RegionCompactRequest;
use store_api::storage::RegionId;
@@ -80,7 +80,6 @@ impl<S> RegionWorkerLoop<S> {
return;
}
};
region.update_compaction_millis();
region.version_control.apply_edit(
Some(request.edit.clone()),
@@ -118,6 +117,31 @@ impl<S> RegionWorkerLoop<S> {
)
.await;
self.handle_ddl_requests(&mut pending_ddls).await;
if self.compaction_scheduler.is_compacting(region_id) {
return;
}
let now = self.time_provider.current_time_millis();
if now - region.last_schedule_compaction_millis()
>= self.config.min_compaction_interval.as_millis() as i64
{
debug!(
"minimal compaction interval time {:?} has passed, scheduling next compaction",
self.config.min_compaction_interval
);
if self
.compaction_scheduler
.schedule_next_compaction(
region_id,
&region.manifest_ctx,
self.schema_metadata_manager.clone(),
)
.await
{
region.update_schedule_compaction_millis();
}
}
}
pub(crate) async fn handle_compaction_cancelled(
@@ -160,9 +184,14 @@ impl<S> RegionWorkerLoop<S> {
return;
}
let now = self.time_provider.current_time_millis();
if now - region.last_compaction_millis()
if now - region.last_schedule_compaction_millis()
>= self.config.min_compaction_interval.as_millis() as i64
&& let Err(e) = self
{
debug!(
"minimal compaction interval time {:?} has passed, scheduling next compaction",
self.config.min_compaction_interval
);
match self
.compaction_scheduler
.schedule_compaction(
region.region_id,
@@ -175,11 +204,13 @@ impl<S> RegionWorkerLoop<S> {
1, // Default for automatic compaction
)
.await
{
warn!(
"Failed to schedule compaction for region: {}, err: {}",
region.region_id, e
);
{
Ok(true) => region.update_schedule_compaction_millis(),
Ok(false) => {}
Err(e) => {
error!(e; "Failed to schedule compaction for region: {}", region.region_id)
}
}
}
}
}