From d43e31c7edbbeab2aa452d5efc97fa03e60a6443 Mon Sep 17 00:00:00 2001 From: LFC <990479+MichaelScofield@users.noreply.github.com> Date: Wed, 4 Sep 2024 18:10:07 +0800 Subject: [PATCH] feat: schedule compaction when adding sst files by editing region (#4648) * feat: schedule compaction when adding sst files by editing region * add minimum time interval for two successive compactions * resolve PR comments --- config/config.md | 2 + config/datanode.example.toml | 4 ++ config/standalone.example.toml | 4 ++ src/mito2/src/compaction.rs | 8 ++- src/mito2/src/config.rs | 6 ++ src/mito2/src/engine/edit_region_test.rs | 88 ++++++++++++++++++++++- src/mito2/src/engine/flush_test.rs | 6 +- src/mito2/src/engine/listener.rs | 3 + src/mito2/src/region.rs | 13 ++++ src/mito2/src/region/opener.rs | 28 ++++---- src/mito2/src/worker.rs | 7 ++ src/mito2/src/worker/handle_catchup.rs | 1 + src/mito2/src/worker/handle_compaction.rs | 34 ++++++++- src/mito2/src/worker/handle_create.rs | 1 + src/mito2/src/worker/handle_flush.rs | 21 +----- src/mito2/src/worker/handle_manifest.rs | 7 ++ src/mito2/src/worker/handle_open.rs | 1 + tests-integration/tests/http.rs | 1 + 18 files changed, 193 insertions(+), 42 deletions(-) diff --git a/config/config.md b/config/config.md index 83bc85612c..f0ee9e54f8 100644 --- a/config/config.md +++ b/config/config.md @@ -129,6 +129,7 @@ | `region_engine.mito.scan_parallelism` | Integer | `0` | Parallelism to scan a region (default: 1/4 of cpu cores).
- `0`: using the default value (1/4 of cpu cores).
- `1`: scan in current thread.
- `n`: scan in parallelism n. | | `region_engine.mito.parallel_scan_channel_size` | Integer | `32` | Capacity of the channel to send data from parallel scan tasks to the main task. | | `region_engine.mito.allow_stale_entries` | Bool | `false` | Whether to allow stale WAL entries read during replay. | +| `region_engine.mito.min_compaction_interval` | String | `0m` | Minimum time interval between two compactions.
To align with the old behavior, the default value is 0 (no restrictions). | | `region_engine.mito.index` | -- | -- | The options for index in Mito engine. | | `region_engine.mito.index.aux_path` | String | `""` | Auxiliary directory path for the index in filesystem, used to store intermediate files for
creating the index and staging files for searching the index, defaults to `{data_home}/index_intermediate`.
The default name for this directory is `index_intermediate` for backward compatibility.

This path contains two subdirectories:
- `__intm`: for storing intermediate files used during creating index.
- `staging`: for storing staging files used during searching index. | | `region_engine.mito.index.staging_size` | String | `2GB` | The max capacity of the staging directory. | @@ -431,6 +432,7 @@ | `region_engine.mito.scan_parallelism` | Integer | `0` | Parallelism to scan a region (default: 1/4 of cpu cores).
- `0`: using the default value (1/4 of cpu cores).
- `1`: scan in current thread.
- `n`: scan in parallelism n. | | `region_engine.mito.parallel_scan_channel_size` | Integer | `32` | Capacity of the channel to send data from parallel scan tasks to the main task. | | `region_engine.mito.allow_stale_entries` | Bool | `false` | Whether to allow stale WAL entries read during replay. | +| `region_engine.mito.min_compaction_interval` | String | `0m` | Minimum time interval between two compactions.
To align with the old behavior, the default value is 0 (no restrictions). | | `region_engine.mito.index` | -- | -- | The options for index in Mito engine. | | `region_engine.mito.index.aux_path` | String | `""` | Auxiliary directory path for the index in filesystem, used to store intermediate files for
creating the index and staging files for searching the index, defaults to `{data_home}/index_intermediate`.
The default name for this directory is `index_intermediate` for backward compatibility.

This path contains two subdirectories:
- `__intm`: for storing intermediate files used during creating index.
- `staging`: for storing staging files used during searching index. | | `region_engine.mito.index.staging_size` | String | `2GB` | The max capacity of the staging directory. | diff --git a/config/datanode.example.toml b/config/datanode.example.toml index 8b55d6e533..07c1df3e2a 100644 --- a/config/datanode.example.toml +++ b/config/datanode.example.toml @@ -455,6 +455,10 @@ parallel_scan_channel_size = 32 ## Whether to allow stale WAL entries read during replay. allow_stale_entries = false +## Minimum time interval between two compactions. +## To align with the old behavior, the default value is 0 (no restrictions). +min_compaction_interval = "0m" + ## The options for index in Mito engine. [region_engine.mito.index] diff --git a/config/standalone.example.toml b/config/standalone.example.toml index b065e1a09d..f36c0e2904 100644 --- a/config/standalone.example.toml +++ b/config/standalone.example.toml @@ -493,6 +493,10 @@ parallel_scan_channel_size = 32 ## Whether to allow stale WAL entries read during replay. allow_stale_entries = false +## Minimum time interval between two compactions. +## To align with the old behavior, the default value is 0 (no restrictions). +min_compaction_interval = "0m" + ## The options for index in Mito engine. [region_engine.mito.index] diff --git a/src/mito2/src/compaction.rs b/src/mito2/src/compaction.rs index 9ee2252f2d..0f33471b21 100644 --- a/src/mito2/src/compaction.rs +++ b/src/mito2/src/compaction.rs @@ -160,8 +160,12 @@ impl CompactionScheduler { self.listener.clone(), ); self.region_status.insert(region_id, status); - self.schedule_compaction_request(request, compact_options) - .await + let result = self + .schedule_compaction_request(request, compact_options) + .await; + + self.listener.on_compaction_scheduled(region_id); + result } /// Notifies the scheduler that the compaction job is finished successfully. diff --git a/src/mito2/src/config.rs b/src/mito2/src/config.rs index 7af36ab896..bbdff81ff6 100644 --- a/src/mito2/src/config.rs +++ b/src/mito2/src/config.rs @@ -122,6 +122,11 @@ pub struct MitoConfig { /// Memtable config pub memtable: MemtableConfig, + + /// Minimum time interval between two compactions. + /// To align with the old behavior, the default value is 0 (no restrictions). + #[serde(with = "humantime_serde")] + pub min_compaction_interval: Duration, } impl Default for MitoConfig { @@ -152,6 +157,7 @@ impl Default for MitoConfig { inverted_index: InvertedIndexConfig::default(), fulltext_index: FulltextIndexConfig::default(), memtable: MemtableConfig::default(), + min_compaction_interval: Duration::from_secs(0), }; // Adjust buffer and cache size according to system memory if we can. diff --git a/src/mito2/src/engine/edit_region_test.rs b/src/mito2/src/engine/edit_region_test.rs index b13691fb85..51f2a976b3 100644 --- a/src/mito2/src/engine/edit_region_test.rs +++ b/src/mito2/src/engine/edit_region_test.rs @@ -15,6 +15,7 @@ use std::sync::{Arc, Mutex}; use std::time::Duration; +use common_time::util::current_time_millis; use object_store::ObjectStore; use store_api::region_engine::RegionEngine; use store_api::region_request::RegionRequest; @@ -22,6 +23,7 @@ use store_api::storage::RegionId; use tokio::sync::{oneshot, Barrier}; use crate::config::MitoConfig; +use crate::engine::flush_test::MockTimeProvider; use crate::engine::listener::EventListener; use crate::engine::MitoEngine; use crate::manifest::action::RegionEdit; @@ -29,6 +31,84 @@ use crate::region::MitoRegionRef; use crate::sst::file::{FileId, FileMeta}; use crate::test_util::{CreateRequestBuilder, TestEnv}; +#[tokio::test] +async fn test_edit_region_schedule_compaction() { + let mut env = TestEnv::new(); + + struct EditRegionListener { + tx: Mutex>>, + } + + impl EventListener for EditRegionListener { + fn on_compaction_scheduled(&self, region_id: RegionId) { + let mut tx = self.tx.lock().unwrap(); + tx.take().unwrap().send(region_id).unwrap(); + } + } + + let (tx, mut rx) = oneshot::channel(); + let config = MitoConfig { + min_compaction_interval: Duration::from_secs(60 * 60), + ..Default::default() + }; + let time_provider = Arc::new(MockTimeProvider::new(current_time_millis())); + let engine = env + .create_engine_with_time( + config.clone(), + None, + Some(Arc::new(EditRegionListener { + tx: Mutex::new(Some(tx)), + })), + time_provider.clone(), + ) + .await; + + let region_id = RegionId::new(1, 1); + engine + .handle_request( + region_id, + RegionRequest::Create(CreateRequestBuilder::new().build()), + ) + .await + .unwrap(); + let region = engine.get_region(region_id).unwrap(); + + let new_edit = || RegionEdit { + files_to_add: vec![FileMeta { + region_id: region.region_id, + file_id: FileId::random(), + level: 0, + ..Default::default() + }], + files_to_remove: vec![], + compaction_time_window: None, + flushed_entry_id: None, + flushed_sequence: None, + }; + engine + .edit_region(region.region_id, new_edit()) + .await + .unwrap(); + // Asserts that the compaction of the region is not scheduled, + // because the minimum time interval between two compactions is not passed. + assert_eq!(rx.try_recv(), Err(oneshot::error::TryRecvError::Empty)); + + // Simulates the time has passed the min compaction interval, + time_provider + .set_now(current_time_millis() + config.min_compaction_interval.as_millis() as i64); + // ... then edits the region again, + engine + .edit_region(region.region_id, new_edit()) + .await + .unwrap(); + // ... finally asserts that the compaction of the region is scheduled. + let actual = tokio::time::timeout(Duration::from_secs(9), rx) + .await + .unwrap() + .unwrap(); + assert_eq!(region_id, actual); +} + #[tokio::test] async fn test_edit_region_fill_cache() { let mut env = TestEnv::new(); @@ -151,7 +231,13 @@ async fn test_edit_region_concurrently() { } let mut env = TestEnv::new(); - let engine = env.create_engine(MitoConfig::default()).await; + let engine = env + .create_engine(MitoConfig { + // Suppress the compaction to not impede the speed of this kinda stress testing. + min_compaction_interval: Duration::from_secs(60 * 60), + ..Default::default() + }) + .await; let region_id = RegionId::new(1, 1); engine diff --git a/src/mito2/src/engine/flush_test.rs b/src/mito2/src/engine/flush_test.rs index 52fb46dfab..aac02db91e 100644 --- a/src/mito2/src/engine/flush_test.rs +++ b/src/mito2/src/engine/flush_test.rs @@ -306,7 +306,7 @@ async fn test_flush_reopen_region(factory: Option) { } #[derive(Debug)] -struct MockTimeProvider { +pub(crate) struct MockTimeProvider { now: AtomicI64, elapsed: AtomicI64, } @@ -326,14 +326,14 @@ impl TimeProvider for MockTimeProvider { } impl MockTimeProvider { - fn new(now: i64) -> Self { + pub(crate) fn new(now: i64) -> Self { Self { now: AtomicI64::new(now), elapsed: AtomicI64::new(0), } } - fn set_now(&self, now: i64) { + pub(crate) fn set_now(&self, now: i64) { self.now.store(now, Ordering::Relaxed); } diff --git a/src/mito2/src/engine/listener.rs b/src/mito2/src/engine/listener.rs index beea4add1e..a79cb6eafd 100644 --- a/src/mito2/src/engine/listener.rs +++ b/src/mito2/src/engine/listener.rs @@ -66,6 +66,9 @@ pub trait EventListener: Send + Sync { /// Notifies the listener that the file cache is filled when, for example, editing region. fn on_file_cache_filled(&self, _file_id: FileId) {} + + /// Notifies the listener that the compaction is scheduled. + fn on_compaction_scheduled(&self, _region_id: RegionId) {} } pub type EventListenerRef = Arc; diff --git a/src/mito2/src/region.rs b/src/mito2/src/region.rs index 5f945390f9..086fbef7d0 100644 --- a/src/mito2/src/region.rs +++ b/src/mito2/src/region.rs @@ -102,6 +102,8 @@ pub(crate) struct MitoRegion { pub(crate) provider: Provider, /// Last flush time in millis. last_flush_millis: AtomicI64, + /// Last compaction time in millis. + last_compaction_millis: AtomicI64, /// Provider to get current time. time_provider: TimeProviderRef, /// Memtable builder for the region. @@ -151,6 +153,17 @@ impl MitoRegion { self.last_flush_millis.store(now, Ordering::Relaxed); } + /// Return last compaction time in millis. + pub(crate) fn last_compaction_millis(&self) -> i64 { + self.last_compaction_millis.load(Ordering::Relaxed) + } + + /// Update compaction time to now millis. + pub(crate) fn update_compaction_millis(&self) { + let now = self.time_provider.current_time_millis(); + self.last_compaction_millis.store(now, Ordering::Relaxed); + } + /// Returns the region dir. pub(crate) fn region_dir(&self) -> &str { self.access_layer.region_dir() diff --git a/src/mito2/src/region/opener.rs b/src/mito2/src/region/opener.rs index f20da8f3d6..64272a183b 100644 --- a/src/mito2/src/region/opener.rs +++ b/src/mito2/src/region/opener.rs @@ -49,7 +49,7 @@ use crate::schedule::scheduler::SchedulerRef; use crate::sst::file_purger::LocalFilePurger; use crate::sst::index::intermediate::IntermediateManager; use crate::sst::index::puffin_manager::PuffinManagerFactory; -use crate::time_provider::{StdTimeProvider, TimeProviderRef}; +use crate::time_provider::TimeProviderRef; use crate::wal::entry_reader::WalEntryReader; use crate::wal::{EntryId, Wal}; @@ -66,13 +66,15 @@ pub(crate) struct RegionOpener { skip_wal_replay: bool, puffin_manager_factory: PuffinManagerFactory, intermediate_manager: IntermediateManager, - time_provider: Option, + time_provider: TimeProviderRef, stats: ManifestStats, wal_entry_reader: Option>, } impl RegionOpener { /// Returns a new opener. + // TODO(LFC): Reduce the number of arguments. + #[allow(clippy::too_many_arguments)] pub(crate) fn new( region_id: RegionId, region_dir: &str, @@ -81,6 +83,7 @@ impl RegionOpener { purge_scheduler: SchedulerRef, puffin_manager_factory: PuffinManagerFactory, intermediate_manager: IntermediateManager, + time_provider: TimeProviderRef, ) -> RegionOpener { RegionOpener { region_id, @@ -94,7 +97,7 @@ impl RegionOpener { skip_wal_replay: false, puffin_manager_factory, intermediate_manager, - time_provider: None, + time_provider, stats: Default::default(), wal_entry_reader: None, } @@ -223,9 +226,7 @@ impl RegionOpener { self.puffin_manager_factory, self.intermediate_manager, )); - let time_provider = self - .time_provider - .unwrap_or_else(|| Arc::new(StdTimeProvider)); + let now = self.time_provider.current_time_millis(); Ok(MitoRegion { region_id, @@ -242,8 +243,9 @@ impl RegionOpener { self.cache_manager, )), provider, - last_flush_millis: AtomicI64::new(time_provider.current_time_millis()), - time_provider, + last_flush_millis: AtomicI64::new(now), + last_compaction_millis: AtomicI64::new(now), + time_provider: self.time_provider.clone(), memtable_builder, stats: self.stats, }) @@ -377,10 +379,7 @@ impl RegionOpener { } else { info!("Skip the WAL replay for region: {}", region_id); } - let time_provider = self - .time_provider - .clone() - .unwrap_or_else(|| Arc::new(StdTimeProvider)); + let now = self.time_provider.current_time_millis(); let region = MitoRegion { region_id: self.region_id, @@ -393,8 +392,9 @@ impl RegionOpener { )), file_purger, provider: provider.clone(), - last_flush_millis: AtomicI64::new(time_provider.current_time_millis()), - time_provider, + last_flush_millis: AtomicI64::new(now), + last_compaction_millis: AtomicI64::new(now), + time_provider: self.time_provider.clone(), memtable_builder, stats: self.stats.clone(), }; diff --git a/src/mito2/src/worker.rs b/src/mito2/src/worker.rs index 242d48c45f..0f872e24e4 100644 --- a/src/mito2/src/worker.rs +++ b/src/mito2/src/worker.rs @@ -958,6 +958,13 @@ impl WorkerListener { listener.on_file_cache_filled(_file_id); } } + + pub(crate) fn on_compaction_scheduled(&self, _region_id: RegionId) { + #[cfg(any(test, feature = "test"))] + if let Some(listener) = &self.listener { + listener.on_compaction_scheduled(_region_id); + } + } } #[cfg(test)] diff --git a/src/mito2/src/worker/handle_catchup.rs b/src/mito2/src/worker/handle_catchup.rs index 6d16d72c1c..bee00fae5e 100644 --- a/src/mito2/src/worker/handle_catchup.rs +++ b/src/mito2/src/worker/handle_catchup.rs @@ -56,6 +56,7 @@ impl RegionWorkerLoop { self.purge_scheduler.clone(), self.puffin_manager_factory.clone(), self.intermediate_manager.clone(), + self.time_provider.clone(), ) .cache(Some(self.cache_manager.clone())) .options(region.version().options.clone())? diff --git a/src/mito2/src/worker/handle_compaction.rs b/src/mito2/src/worker/handle_compaction.rs index 080c359784..e088932035 100644 --- a/src/mito2/src/worker/handle_compaction.rs +++ b/src/mito2/src/worker/handle_compaction.rs @@ -12,17 +12,18 @@ // See the License for the specific language governing permissions and // limitations under the License. -use common_telemetry::{error, info}; -use store_api::logstore::LogStore; +use api::v1::region::compact_request; +use common_telemetry::{error, info, warn}; use store_api::region_request::RegionCompactRequest; use store_api::storage::RegionId; use crate::error::RegionNotFoundSnafu; use crate::metrics::COMPACTION_REQUEST_COUNT; +use crate::region::MitoRegionRef; use crate::request::{CompactionFailed, CompactionFinished, OnFailure, OptionOutputTx}; use crate::worker::RegionWorkerLoop; -impl RegionWorkerLoop { +impl RegionWorkerLoop { /// Handles compaction request submitted to region worker. pub(crate) async fn handle_compaction_request( &mut self, @@ -68,6 +69,7 @@ impl RegionWorkerLoop { return; } }; + region.update_compaction_millis(); region .version_control @@ -89,4 +91,30 @@ impl RegionWorkerLoop { self.compaction_scheduler .on_compaction_failed(req.region_id, req.err); } + + /// Schedule compaction for the region if necessary. + pub(crate) async fn schedule_compaction(&mut self, region: &MitoRegionRef) { + let now = self.time_provider.current_time_millis(); + if now - region.last_compaction_millis() + >= self.config.min_compaction_interval.as_millis() as i64 + { + if let Err(e) = self + .compaction_scheduler + .schedule_compaction( + region.region_id, + compact_request::Options::Regular(Default::default()), + ®ion.version_control, + ®ion.access_layer, + OptionOutputTx::none(), + ®ion.manifest_ctx, + ) + .await + { + warn!( + "Failed to schedule compaction for region: {}, err: {}", + region.region_id, e + ); + } + } + } } diff --git a/src/mito2/src/worker/handle_create.rs b/src/mito2/src/worker/handle_create.rs index 863435c7ea..200ed1913f 100644 --- a/src/mito2/src/worker/handle_create.rs +++ b/src/mito2/src/worker/handle_create.rs @@ -62,6 +62,7 @@ impl RegionWorkerLoop { self.purge_scheduler.clone(), self.puffin_manager_factory.clone(), self.intermediate_manager.clone(), + self.time_provider.clone(), ) .metadata(metadata) .parse_options(request.options)? diff --git a/src/mito2/src/worker/handle_flush.rs b/src/mito2/src/worker/handle_flush.rs index 8acb289b24..14a70225bb 100644 --- a/src/mito2/src/worker/handle_flush.rs +++ b/src/mito2/src/worker/handle_flush.rs @@ -16,8 +16,7 @@ use std::sync::Arc; -use api::v1::region::compact_request; -use common_telemetry::{error, info, warn}; +use common_telemetry::{error, info}; use store_api::logstore::LogStore; use store_api::region_request::RegionFlushRequest; use store_api::storage::RegionId; @@ -242,23 +241,7 @@ impl RegionWorkerLoop { self.handle_stalled_requests().await; // Schedules compaction. - if let Err(e) = self - .compaction_scheduler - .schedule_compaction( - region.region_id, - compact_request::Options::Regular(Default::default()), - ®ion.version_control, - ®ion.access_layer, - OptionOutputTx::none(), - ®ion.manifest_ctx, - ) - .await - { - warn!( - "Failed to schedule compaction after flush, region: {}, err: {}", - region.region_id, e - ); - } + self.schedule_compaction(®ion).await; self.listener.on_flush_success(region_id); } diff --git a/src/mito2/src/worker/handle_manifest.rs b/src/mito2/src/worker/handle_manifest.rs index a7ebb219ce..de5f4e563d 100644 --- a/src/mito2/src/worker/handle_manifest.rs +++ b/src/mito2/src/worker/handle_manifest.rs @@ -148,6 +148,9 @@ impl RegionWorkerLoop { } }; + let need_compaction = + edit_result.result.is_ok() && !edit_result.edit.files_to_add.is_empty(); + if edit_result.result.is_ok() { // Applies the edit to the region. region @@ -165,6 +168,10 @@ impl RegionWorkerLoop { self.handle_region_edit(request).await; } } + + if need_compaction { + self.schedule_compaction(®ion).await; + } } /// Writes truncate action to the manifest and then applies it to the region in background. diff --git a/src/mito2/src/worker/handle_open.rs b/src/mito2/src/worker/handle_open.rs index fa4f487040..925e2dc8f5 100644 --- a/src/mito2/src/worker/handle_open.rs +++ b/src/mito2/src/worker/handle_open.rs @@ -97,6 +97,7 @@ impl RegionWorkerLoop { self.purge_scheduler.clone(), self.puffin_manager_factory.clone(), self.intermediate_manager.clone(), + self.time_provider.clone(), ) .skip_wal_replay(request.skip_wal_replay) .cache(Some(self.cache_manager.clone())) diff --git a/tests-integration/tests/http.rs b/tests-integration/tests/http.rs index 7b4164023e..497ea4969c 100644 --- a/tests-integration/tests/http.rs +++ b/tests-integration/tests/http.rs @@ -841,6 +841,7 @@ experimental_write_cache_size = "512MiB" sst_write_buffer_size = "8MiB" parallel_scan_channel_size = 32 allow_stale_entries = false +min_compaction_interval = "0s" [region_engine.mito.index] aux_path = ""