diff --git a/src/mito2/src/compaction.rs b/src/mito2/src/compaction.rs index 5cf3c444a8..c91809e253 100644 --- a/src/mito2/src/compaction.rs +++ b/src/mito2/src/compaction.rs @@ -150,6 +150,7 @@ impl CompactionScheduler { } /// Schedules a compaction for the region. + /// Returns whether a compaction is scheduled. #[allow(clippy::too_many_arguments)] pub(crate) async fn schedule_compaction( &mut self, @@ -161,7 +162,7 @@ impl CompactionScheduler { manifest_ctx: &ManifestContextRef, schema_metadata_manager: SchemaMetadataManagerRef, max_parallelism: usize, - ) -> Result<()> { + ) -> Result { // skip compaction if region is in staging state let current_state = manifest_ctx.current_state(); if current_state == RegionRoleState::Leader(RegionLeaderState::Staging) { @@ -170,7 +171,7 @@ impl CompactionScheduler { region_id, compact_options ); waiter.send(Ok(0)); - return Ok(()); + return Ok(false); } if let Some(status) = self.region_status.get_mut(®ion_id) { @@ -192,7 +193,7 @@ impl CompactionScheduler { ); } } - return Ok(()); + return Ok(false); } // The region can compact directly. @@ -209,7 +210,7 @@ impl CompactionScheduler { max_parallelism, ); - let result = match self + match self .schedule_compaction_request(request, compact_options) .await { @@ -220,14 +221,12 @@ impl CompactionScheduler { status.active_compaction = Some(active_compaction); self.region_status.insert(region_id, status); - Ok(()) + self.listener.on_compaction_scheduled(region_id); + Ok(true) } - Ok(None) => Ok(()), + Ok(None) => Ok(false), Err(e) => Err(e), - }; - - self.listener.on_compaction_scheduled(region_id); - result + } } // Handle pending manual compaction request for the region. @@ -334,6 +333,27 @@ impl CompactionScheduler { // And skip try to schedule next compaction task. return pending_ddl_requests; } + Vec::new() + } + + pub(crate) fn is_compacting(&self, region_id: RegionId) -> bool { + self.region_status + .get(®ion_id) + .map(|status| status.active_compaction.is_some()) + .unwrap_or(false) + } + + /// Schedules next compaction upon a finished compaction. + /// Returns whether the compaction is scheduled. + pub(crate) async fn schedule_next_compaction( + &mut self, + region_id: RegionId, + manifest_ctx: &ManifestContextRef, + schema_metadata_manager: SchemaMetadataManagerRef, + ) -> bool { + let Some(status) = self.region_status.get_mut(®ion_id) else { + return false; + }; // We should always try to compact the region until picker returns None. let request = status.new_compaction_request( @@ -364,20 +384,21 @@ impl CompactionScheduler { "Successfully scheduled next compaction for region id: {}", region_id ); + true } Ok(None) => { // No further compaction tasks can be scheduled; cleanup the `CompactionStatus` for this region. // All DDL requests and pending compaction requests have already been processed. // Safe to remove the region from status tracking. self.region_status.remove(®ion_id); + false } Err(e) => { error!(e; "Failed to schedule next compaction for region {}", region_id); self.remove_region_on_failure(region_id, Arc::new(e)); + false } } - - Vec::new() } /// Notifies the scheduler that the compaction job is cancelled cooperatively. @@ -1434,7 +1455,7 @@ mod tests { let manifest_ctx = env .mock_manifest_context(version_control.current().version.metadata.clone()) .await; - scheduler + let scheduled = scheduler .schedule_compaction( builder.region_id(), compact_request::Options::Regular(Default::default()), @@ -1447,6 +1468,7 @@ mod tests { ) .await .unwrap(); + assert!(!scheduled); let output = output_rx.await.unwrap().unwrap(); assert_eq!(output, 0); assert!(scheduler.region_status.is_empty()); @@ -1455,7 +1477,7 @@ mod tests { let version_control = Arc::new(builder.push_l0_file(0, 1000).build()); let (output_tx, output_rx) = oneshot::channel(); let waiter = OptionOutputTx::from(output_tx); - scheduler + let scheduled = scheduler .schedule_compaction( builder.region_id(), compact_request::Options::Regular(Default::default()), @@ -1468,11 +1490,67 @@ mod tests { ) .await .unwrap(); + assert!(!scheduled); let output = output_rx.await.unwrap().unwrap(); assert_eq!(output, 0); assert!(scheduler.region_status.is_empty()); } + #[tokio::test] + async fn test_schedule_compaction_returns_true_when_task_scheduled() { + let job_scheduler = Arc::new(VecScheduler::default()); + let env = SchedulerEnv::new().await.scheduler(job_scheduler.clone()); + let (tx, _rx) = mpsc::channel(4); + let mut scheduler = env.mock_compaction_scheduler(tx); + let mut builder = VersionControlBuilder::new(); + let region_id = builder.region_id(); + let end = 1000 * 1000; + // Five overlapping L0 files are enough for the regular picker to create a task. + let version_control = Arc::new( + builder + .push_l0_file(0, end) + .push_l0_file(10, end) + .push_l0_file(50, end) + .push_l0_file(80, end) + .push_l0_file(90, end) + .build(), + ); + let manifest_ctx = env + .mock_manifest_context(version_control.current().version.metadata.clone()) + .await; + let (schema_metadata_manager, kv_backend) = mock_schema_metadata_manager(); + schema_metadata_manager + .register_region_table_info( + region_id.table_id(), + "test_table", + "test_catalog", + "test_schema", + None, + kv_backend, + ) + .await; + + let scheduled = scheduler + .schedule_compaction( + region_id, + Options::Regular(Default::default()), + &version_control, + &env.access_layer, + OptionOutputTx::none(), + &manifest_ctx, + schema_metadata_manager, + 1, + ) + .await + .unwrap(); + + // The boolean result is what the worker uses to decide whether to update + // last_schedule_compaction_millis. + assert!(scheduled); + assert_eq!(1, job_scheduler.num_jobs()); + assert!(scheduler.region_status.contains_key(®ion_id)); + } + #[tokio::test] async fn test_schedule_on_finished() { common_telemetry::init_default_ut_logging(); @@ -1510,7 +1588,7 @@ mod tests { let manifest_ctx = env .mock_manifest_context(version_control.current().version.metadata.clone()) .await; - scheduler + let scheduled = scheduler .schedule_compaction( region_id, compact_request::Options::Regular(Default::default()), @@ -1524,6 +1602,7 @@ mod tests { .await .unwrap(); // Should schedule 1 compaction. + assert!(scheduled); assert_eq!(1, scheduler.region_status.len()); assert_eq!(1, job_scheduler.num_jobs()); let data = version_control.current(); @@ -1542,7 +1621,7 @@ mod tests { ); // The task is pending. let (tx, _rx) = oneshot::channel(); - scheduler + let scheduled = scheduler .schedule_compaction( region_id, compact_request::Options::Regular(Default::default()), @@ -1555,6 +1634,7 @@ mod tests { ) .await .unwrap(); + assert!(!scheduled); assert_eq!(1, scheduler.region_status.len()); assert_eq!(1, job_scheduler.num_jobs()); assert!( @@ -1570,6 +1650,10 @@ mod tests { scheduler .on_compaction_finished(region_id, &manifest_ctx, schema_metadata_manager.clone()) .await; + let scheduled = scheduler + .schedule_next_compaction(region_id, &manifest_ctx, schema_metadata_manager.clone()) + .await; + assert!(scheduled); assert_eq!(1, scheduler.region_status.len()); assert_eq!(2, job_scheduler.num_jobs()); @@ -1582,7 +1666,7 @@ mod tests { ); let (tx, _rx) = oneshot::channel(); // The task is pending. - scheduler + let scheduled = scheduler .schedule_compaction( region_id, compact_request::Options::Regular(Default::default()), @@ -1595,6 +1679,7 @@ mod tests { ) .await .unwrap(); + assert!(!scheduled); assert_eq!(2, job_scheduler.num_jobs()); assert!( !scheduler @@ -2328,6 +2413,15 @@ mod tests { .await; assert!(pending_ddls.is_empty()); + assert!(scheduler.region_status.contains_key(®ion_id)); + + let (schema_metadata_manager, _kv_backend) = mock_schema_metadata_manager(); + // With no compactable files, next scheduling returns false and removes + // the status without creating a background task. + let scheduled = scheduler + .schedule_next_compaction(region_id, &manifest_ctx, schema_metadata_manager) + .await; + assert!(!scheduled); assert!(!scheduler.region_status.contains_key(®ion_id)); } @@ -2370,6 +2464,14 @@ mod tests { .await; assert!(pending_ddls.is_empty()); + assert!(scheduler.region_status.contains_key(®ion_id)); + + let (schema_metadata_manager, _kv_backend) = mock_schema_metadata_manager(); + // The failing scheduler simulates a submit error; callers must see false. + let scheduled = scheduler + .schedule_next_compaction(region_id, &manifest_ctx, schema_metadata_manager) + .await; + assert!(!scheduled); assert!(!scheduler.region_status.contains_key(®ion_id)); } diff --git a/src/mito2/src/engine/edit_region_test.rs b/src/mito2/src/engine/edit_region_test.rs index e05e1a847a..1d9bf2101b 100644 --- a/src/mito2/src/engine/edit_region_test.rs +++ b/src/mito2/src/engine/edit_region_test.rs @@ -21,6 +21,7 @@ use common_error::ext::ErrorExt; use common_error::status_code::StatusCode; use common_recordbatch::DfRecordBatch; use common_test_util::flight::encode_to_flight_data; +use common_time::Timestamp; use common_time::util::current_time_millis; use datatypes::arrow::array::{ArrayRef, Float64Array, StringArray, TimestampMillisecondArray}; use datatypes::arrow::datatypes::{DataType, Field, Schema, TimeUnit}; @@ -67,7 +68,8 @@ async fn test_edit_region_schedule_compaction_with_format(flat_format: bool) { default_flat_format: flat_format, ..Default::default() }; - let time_provider = Arc::new(MockTimeProvider::new(current_time_millis())); + let initial_time = current_time_millis(); + let time_provider = Arc::new(MockTimeProvider::new(initial_time)); let engine = env .create_engine_with_time( config.clone(), @@ -99,14 +101,22 @@ async fn test_edit_region_schedule_compaction_with_format(flat_format: bool) { .await .unwrap(); let region = engine.get_region(region_id).unwrap(); + let initial_schedule_time = region.last_schedule_compaction_millis(); + assert_eq!(initial_time, initial_schedule_time); - let new_edit = || RegionEdit { - files_to_add: vec![FileMeta { - region_id: region.region_id, - file_id: FileId::random(), - level: 0, - ..Default::default() - }], + let new_edit = |file_starts: &[i64]| RegionEdit { + files_to_add: file_starts + .iter() + .map(|start| FileMeta { + region_id: region.region_id, + file_id: FileId::random(), + time_range: ( + Timestamp::new_millisecond(*start), + Timestamp::new_millisecond(1000 * 1000), + ), + ..Default::default() + }) + .collect(), files_to_remove: vec![], timestamp_ms: None, compaction_time_window: None, @@ -115,19 +125,23 @@ async fn test_edit_region_schedule_compaction_with_format(flat_format: bool) { committed_sequence: None, }; engine - .edit_region(region.region_id, new_edit()) + .edit_region(region.region_id, new_edit(&[0, 10, 50, 80])) .await .unwrap(); // Asserts that the compaction of the region is not scheduled, // because the minimum time interval between two compactions is not passed. assert_eq!(rx.try_recv(), Err(oneshot::error::TryRecvError::Empty)); + assert_eq!( + initial_schedule_time, + region.last_schedule_compaction_millis() + ); // Simulates the time has passed the min compaction interval, - time_provider - .set_now(current_time_millis() + config.min_compaction_interval.as_millis() as i64); + let next_schedule_time = initial_time + config.min_compaction_interval.as_millis() as i64; + time_provider.set_now(next_schedule_time); // ... then edits the region again, engine - .edit_region(region.region_id, new_edit()) + .edit_region(region.region_id, new_edit(&[90])) .await .unwrap(); // ... finally asserts that the compaction of the region is scheduled. @@ -136,6 +150,9 @@ async fn test_edit_region_schedule_compaction_with_format(flat_format: bool) { .unwrap() .unwrap(); assert_eq!(region_id, actual); + // Wait for the `last_schedule_compaction_millis` to update. + tokio::time::sleep(Duration::from_millis(100)).await; + assert_eq!(next_schedule_time, region.last_schedule_compaction_millis()); } #[tokio::test] diff --git a/src/mito2/src/region.rs b/src/mito2/src/region.rs index 9d214caed3..4acec5a893 100644 --- a/src/mito2/src/region.rs +++ b/src/mito2/src/region.rs @@ -157,8 +157,8 @@ pub struct MitoRegion { pub(crate) provider: Provider, /// Last flush time in millis. last_flush_millis: AtomicI64, - /// Last compaction time in millis. - last_compaction_millis: AtomicI64, + /// Last schedule compaction time in millis. + last_schedule_compaction_millis: AtomicI64, /// Provider to get current time. time_provider: TimeProviderRef, /// The topic's latest entry id since the region's last flushing. @@ -251,15 +251,16 @@ impl MitoRegion { self.last_flush_millis.store(now, Ordering::Relaxed); } - /// Returns last compaction timestamp in millis. - pub(crate) fn last_compaction_millis(&self) -> i64 { - self.last_compaction_millis.load(Ordering::Relaxed) + /// Returns last schedule compaction timestamp in millis. + pub(crate) fn last_schedule_compaction_millis(&self) -> i64 { + self.last_schedule_compaction_millis.load(Ordering::Relaxed) } - /// Update compaction time to current time. - pub(crate) fn update_compaction_millis(&self) { + /// Update schedule compaction time to current time. + pub(crate) fn update_schedule_compaction_millis(&self) { let now = self.time_provider.current_time_millis(); - self.last_compaction_millis.store(now, Ordering::Relaxed); + self.last_schedule_compaction_millis + .store(now, Ordering::Relaxed); } /// Returns the table dir. @@ -1727,7 +1728,7 @@ mod tests { file_purger: crate::test_util::new_noop_file_purger(), provider: Provider::noop_provider(), last_flush_millis: Default::default(), - last_compaction_millis: Default::default(), + last_schedule_compaction_millis: Default::default(), time_provider: Arc::new(StdTimeProvider), topic_latest_entry_id: Default::default(), written_bytes: Arc::new(AtomicU64::new(0)), @@ -2084,7 +2085,7 @@ mod tests { file_purger: crate::test_util::new_noop_file_purger(), provider: Provider::noop_provider(), last_flush_millis: Default::default(), - last_compaction_millis: Default::default(), + last_schedule_compaction_millis: Default::default(), time_provider: Arc::new(StdTimeProvider), topic_latest_entry_id: Default::default(), written_bytes: Arc::new(AtomicU64::new(0)), diff --git a/src/mito2/src/region/opener.rs b/src/mito2/src/region/opener.rs index 6785181b48..3142a87c38 100644 --- a/src/mito2/src/region/opener.rs +++ b/src/mito2/src/region/opener.rs @@ -345,7 +345,7 @@ impl RegionOpener { ), provider, last_flush_millis: AtomicI64::new(now), - last_compaction_millis: AtomicI64::new(now), + last_schedule_compaction_millis: AtomicI64::new(now), time_provider: self.time_provider.clone(), topic_latest_entry_id: AtomicU64::new(0), written_bytes: Arc::new(AtomicU64::new(0)), @@ -581,7 +581,7 @@ impl RegionOpener { file_purger, provider: provider.clone(), last_flush_millis: AtomicI64::new(now), - last_compaction_millis: AtomicI64::new(now), + last_schedule_compaction_millis: AtomicI64::new(now), time_provider: self.time_provider.clone(), topic_latest_entry_id: AtomicU64::new(topic_latest_entry_id), written_bytes: Arc::new(AtomicU64::new(0)), diff --git a/src/mito2/src/worker/handle_compaction.rs b/src/mito2/src/worker/handle_compaction.rs index fd021771b1..3abdf0e349 100644 --- a/src/mito2/src/worker/handle_compaction.rs +++ b/src/mito2/src/worker/handle_compaction.rs @@ -13,7 +13,7 @@ // limitations under the License. use api::v1::region::compact_request; -use common_telemetry::{error, info, warn}; +use common_telemetry::{debug, error, info}; use store_api::logstore::LogStore; use store_api::region_request::RegionCompactRequest; use store_api::storage::RegionId; @@ -80,7 +80,6 @@ impl RegionWorkerLoop { return; } }; - region.update_compaction_millis(); region.version_control.apply_edit( Some(request.edit.clone()), @@ -118,6 +117,31 @@ impl RegionWorkerLoop { ) .await; self.handle_ddl_requests(&mut pending_ddls).await; + + if self.compaction_scheduler.is_compacting(region_id) { + return; + } + + let now = self.time_provider.current_time_millis(); + if now - region.last_schedule_compaction_millis() + >= self.config.min_compaction_interval.as_millis() as i64 + { + debug!( + "minimal compaction interval time {:?} has passed, scheduling next compaction", + self.config.min_compaction_interval + ); + if self + .compaction_scheduler + .schedule_next_compaction( + region_id, + ®ion.manifest_ctx, + self.schema_metadata_manager.clone(), + ) + .await + { + region.update_schedule_compaction_millis(); + } + } } pub(crate) async fn handle_compaction_cancelled( @@ -160,9 +184,14 @@ impl RegionWorkerLoop { return; } let now = self.time_provider.current_time_millis(); - if now - region.last_compaction_millis() + if now - region.last_schedule_compaction_millis() >= self.config.min_compaction_interval.as_millis() as i64 - && let Err(e) = self + { + debug!( + "minimal compaction interval time {:?} has passed, scheduling next compaction", + self.config.min_compaction_interval + ); + match self .compaction_scheduler .schedule_compaction( region.region_id, @@ -175,11 +204,13 @@ impl RegionWorkerLoop { 1, // Default for automatic compaction ) .await - { - warn!( - "Failed to schedule compaction for region: {}, err: {}", - region.region_id, e - ); + { + Ok(true) => region.update_schedule_compaction_millis(), + Ok(false) => {} + Err(e) => { + error!(e; "Failed to schedule compaction for region: {}", region.region_id) + } + } } } }