feat: schedule compaction when adding sst files by editing region (#4648)

* feat: schedule compaction when adding sst files by editing region

* add minimum time interval for two successive compactions

* resolve PR comments
This commit is contained in:
LFC
2024-09-04 18:10:07 +08:00
committed by GitHub
parent 19e2a9d44b
commit d43e31c7ed
18 changed files with 193 additions and 42 deletions

View File

@@ -129,6 +129,7 @@
| `region_engine.mito.scan_parallelism` | Integer | `0` | Parallelism to scan a region (default: 1/4 of cpu cores).<br/>- `0`: using the default value (1/4 of cpu cores).<br/>- `1`: scan in current thread.<br/>- `n`: scan in parallelism n. |
| `region_engine.mito.parallel_scan_channel_size` | Integer | `32` | Capacity of the channel to send data from parallel scan tasks to the main task. |
| `region_engine.mito.allow_stale_entries` | Bool | `false` | Whether to allow stale WAL entries read during replay. |
| `region_engine.mito.min_compaction_interval` | String | `0m` | Minimum time interval between two compactions.<br/>To align with the old behavior, the default value is 0 (no restrictions). |
| `region_engine.mito.index` | -- | -- | The options for index in Mito engine. |
| `region_engine.mito.index.aux_path` | String | `""` | Auxiliary directory path for the index in filesystem, used to store intermediate files for<br/>creating the index and staging files for searching the index, defaults to `{data_home}/index_intermediate`.<br/>The default name for this directory is `index_intermediate` for backward compatibility.<br/><br/>This path contains two subdirectories:<br/>- `__intm`: for storing intermediate files used during creating index.<br/>- `staging`: for storing staging files used during searching index. |
| `region_engine.mito.index.staging_size` | String | `2GB` | The max capacity of the staging directory. |
@@ -431,6 +432,7 @@
| `region_engine.mito.scan_parallelism` | Integer | `0` | Parallelism to scan a region (default: 1/4 of cpu cores).<br/>- `0`: using the default value (1/4 of cpu cores).<br/>- `1`: scan in current thread.<br/>- `n`: scan in parallelism n. |
| `region_engine.mito.parallel_scan_channel_size` | Integer | `32` | Capacity of the channel to send data from parallel scan tasks to the main task. |
| `region_engine.mito.allow_stale_entries` | Bool | `false` | Whether to allow stale WAL entries read during replay. |
| `region_engine.mito.min_compaction_interval` | String | `0m` | Minimum time interval between two compactions.<br/>To align with the old behavior, the default value is 0 (no restrictions). |
| `region_engine.mito.index` | -- | -- | The options for index in Mito engine. |
| `region_engine.mito.index.aux_path` | String | `""` | Auxiliary directory path for the index in filesystem, used to store intermediate files for<br/>creating the index and staging files for searching the index, defaults to `{data_home}/index_intermediate`.<br/>The default name for this directory is `index_intermediate` for backward compatibility.<br/><br/>This path contains two subdirectories:<br/>- `__intm`: for storing intermediate files used during creating index.<br/>- `staging`: for storing staging files used during searching index. |
| `region_engine.mito.index.staging_size` | String | `2GB` | The max capacity of the staging directory. |

View File

@@ -455,6 +455,10 @@ parallel_scan_channel_size = 32
## Whether to allow stale WAL entries read during replay.
allow_stale_entries = false
## Minimum time interval between two compactions.
## To align with the old behavior, the default value is 0 (no restrictions).
min_compaction_interval = "0m"
## The options for index in Mito engine.
[region_engine.mito.index]

View File

@@ -493,6 +493,10 @@ parallel_scan_channel_size = 32
## Whether to allow stale WAL entries read during replay.
allow_stale_entries = false
## Minimum time interval between two compactions.
## To align with the old behavior, the default value is 0 (no restrictions).
min_compaction_interval = "0m"
## The options for index in Mito engine.
[region_engine.mito.index]

View File

@@ -160,8 +160,12 @@ impl CompactionScheduler {
self.listener.clone(),
);
self.region_status.insert(region_id, status);
self.schedule_compaction_request(request, compact_options)
.await
let result = self
.schedule_compaction_request(request, compact_options)
.await;
self.listener.on_compaction_scheduled(region_id);
result
}
/// Notifies the scheduler that the compaction job is finished successfully.

View File

@@ -122,6 +122,11 @@ pub struct MitoConfig {
/// Memtable config
pub memtable: MemtableConfig,
/// Minimum time interval between two compactions.
/// To align with the old behavior, the default value is 0 (no restrictions).
#[serde(with = "humantime_serde")]
pub min_compaction_interval: Duration,
}
impl Default for MitoConfig {
@@ -152,6 +157,7 @@ impl Default for MitoConfig {
inverted_index: InvertedIndexConfig::default(),
fulltext_index: FulltextIndexConfig::default(),
memtable: MemtableConfig::default(),
min_compaction_interval: Duration::from_secs(0),
};
// Adjust buffer and cache size according to system memory if we can.

View File

@@ -15,6 +15,7 @@
use std::sync::{Arc, Mutex};
use std::time::Duration;
use common_time::util::current_time_millis;
use object_store::ObjectStore;
use store_api::region_engine::RegionEngine;
use store_api::region_request::RegionRequest;
@@ -22,6 +23,7 @@ use store_api::storage::RegionId;
use tokio::sync::{oneshot, Barrier};
use crate::config::MitoConfig;
use crate::engine::flush_test::MockTimeProvider;
use crate::engine::listener::EventListener;
use crate::engine::MitoEngine;
use crate::manifest::action::RegionEdit;
@@ -29,6 +31,84 @@ use crate::region::MitoRegionRef;
use crate::sst::file::{FileId, FileMeta};
use crate::test_util::{CreateRequestBuilder, TestEnv};
#[tokio::test]
async fn test_edit_region_schedule_compaction() {
let mut env = TestEnv::new();
struct EditRegionListener {
tx: Mutex<Option<oneshot::Sender<RegionId>>>,
}
impl EventListener for EditRegionListener {
fn on_compaction_scheduled(&self, region_id: RegionId) {
let mut tx = self.tx.lock().unwrap();
tx.take().unwrap().send(region_id).unwrap();
}
}
let (tx, mut rx) = oneshot::channel();
let config = MitoConfig {
min_compaction_interval: Duration::from_secs(60 * 60),
..Default::default()
};
let time_provider = Arc::new(MockTimeProvider::new(current_time_millis()));
let engine = env
.create_engine_with_time(
config.clone(),
None,
Some(Arc::new(EditRegionListener {
tx: Mutex::new(Some(tx)),
})),
time_provider.clone(),
)
.await;
let region_id = RegionId::new(1, 1);
engine
.handle_request(
region_id,
RegionRequest::Create(CreateRequestBuilder::new().build()),
)
.await
.unwrap();
let region = engine.get_region(region_id).unwrap();
let new_edit = || RegionEdit {
files_to_add: vec![FileMeta {
region_id: region.region_id,
file_id: FileId::random(),
level: 0,
..Default::default()
}],
files_to_remove: vec![],
compaction_time_window: None,
flushed_entry_id: None,
flushed_sequence: None,
};
engine
.edit_region(region.region_id, new_edit())
.await
.unwrap();
// Asserts that the compaction of the region is not scheduled,
// because the minimum time interval between two compactions is not passed.
assert_eq!(rx.try_recv(), Err(oneshot::error::TryRecvError::Empty));
// Simulates the time has passed the min compaction interval,
time_provider
.set_now(current_time_millis() + config.min_compaction_interval.as_millis() as i64);
// ... then edits the region again,
engine
.edit_region(region.region_id, new_edit())
.await
.unwrap();
// ... finally asserts that the compaction of the region is scheduled.
let actual = tokio::time::timeout(Duration::from_secs(9), rx)
.await
.unwrap()
.unwrap();
assert_eq!(region_id, actual);
}
#[tokio::test]
async fn test_edit_region_fill_cache() {
let mut env = TestEnv::new();
@@ -151,7 +231,13 @@ async fn test_edit_region_concurrently() {
}
let mut env = TestEnv::new();
let engine = env.create_engine(MitoConfig::default()).await;
let engine = env
.create_engine(MitoConfig {
// Suppress the compaction to not impede the speed of this kinda stress testing.
min_compaction_interval: Duration::from_secs(60 * 60),
..Default::default()
})
.await;
let region_id = RegionId::new(1, 1);
engine

View File

@@ -306,7 +306,7 @@ async fn test_flush_reopen_region(factory: Option<LogStoreFactory>) {
}
#[derive(Debug)]
struct MockTimeProvider {
pub(crate) struct MockTimeProvider {
now: AtomicI64,
elapsed: AtomicI64,
}
@@ -326,14 +326,14 @@ impl TimeProvider for MockTimeProvider {
}
impl MockTimeProvider {
fn new(now: i64) -> Self {
pub(crate) fn new(now: i64) -> Self {
Self {
now: AtomicI64::new(now),
elapsed: AtomicI64::new(0),
}
}
fn set_now(&self, now: i64) {
pub(crate) fn set_now(&self, now: i64) {
self.now.store(now, Ordering::Relaxed);
}

View File

@@ -66,6 +66,9 @@ pub trait EventListener: Send + Sync {
/// Notifies the listener that the file cache is filled when, for example, editing region.
fn on_file_cache_filled(&self, _file_id: FileId) {}
/// Notifies the listener that the compaction is scheduled.
fn on_compaction_scheduled(&self, _region_id: RegionId) {}
}
pub type EventListenerRef = Arc<dyn EventListener>;

View File

@@ -102,6 +102,8 @@ pub(crate) struct MitoRegion {
pub(crate) provider: Provider,
/// Last flush time in millis.
last_flush_millis: AtomicI64,
/// Last compaction time in millis.
last_compaction_millis: AtomicI64,
/// Provider to get current time.
time_provider: TimeProviderRef,
/// Memtable builder for the region.
@@ -151,6 +153,17 @@ impl MitoRegion {
self.last_flush_millis.store(now, Ordering::Relaxed);
}
/// Return last compaction time in millis.
pub(crate) fn last_compaction_millis(&self) -> i64 {
self.last_compaction_millis.load(Ordering::Relaxed)
}
/// Update compaction time to now millis.
pub(crate) fn update_compaction_millis(&self) {
let now = self.time_provider.current_time_millis();
self.last_compaction_millis.store(now, Ordering::Relaxed);
}
/// Returns the region dir.
pub(crate) fn region_dir(&self) -> &str {
self.access_layer.region_dir()

View File

@@ -49,7 +49,7 @@ use crate::schedule::scheduler::SchedulerRef;
use crate::sst::file_purger::LocalFilePurger;
use crate::sst::index::intermediate::IntermediateManager;
use crate::sst::index::puffin_manager::PuffinManagerFactory;
use crate::time_provider::{StdTimeProvider, TimeProviderRef};
use crate::time_provider::TimeProviderRef;
use crate::wal::entry_reader::WalEntryReader;
use crate::wal::{EntryId, Wal};
@@ -66,13 +66,15 @@ pub(crate) struct RegionOpener {
skip_wal_replay: bool,
puffin_manager_factory: PuffinManagerFactory,
intermediate_manager: IntermediateManager,
time_provider: Option<TimeProviderRef>,
time_provider: TimeProviderRef,
stats: ManifestStats,
wal_entry_reader: Option<Box<dyn WalEntryReader>>,
}
impl RegionOpener {
/// Returns a new opener.
// TODO(LFC): Reduce the number of arguments.
#[allow(clippy::too_many_arguments)]
pub(crate) fn new(
region_id: RegionId,
region_dir: &str,
@@ -81,6 +83,7 @@ impl RegionOpener {
purge_scheduler: SchedulerRef,
puffin_manager_factory: PuffinManagerFactory,
intermediate_manager: IntermediateManager,
time_provider: TimeProviderRef,
) -> RegionOpener {
RegionOpener {
region_id,
@@ -94,7 +97,7 @@ impl RegionOpener {
skip_wal_replay: false,
puffin_manager_factory,
intermediate_manager,
time_provider: None,
time_provider,
stats: Default::default(),
wal_entry_reader: None,
}
@@ -223,9 +226,7 @@ impl RegionOpener {
self.puffin_manager_factory,
self.intermediate_manager,
));
let time_provider = self
.time_provider
.unwrap_or_else(|| Arc::new(StdTimeProvider));
let now = self.time_provider.current_time_millis();
Ok(MitoRegion {
region_id,
@@ -242,8 +243,9 @@ impl RegionOpener {
self.cache_manager,
)),
provider,
last_flush_millis: AtomicI64::new(time_provider.current_time_millis()),
time_provider,
last_flush_millis: AtomicI64::new(now),
last_compaction_millis: AtomicI64::new(now),
time_provider: self.time_provider.clone(),
memtable_builder,
stats: self.stats,
})
@@ -377,10 +379,7 @@ impl RegionOpener {
} else {
info!("Skip the WAL replay for region: {}", region_id);
}
let time_provider = self
.time_provider
.clone()
.unwrap_or_else(|| Arc::new(StdTimeProvider));
let now = self.time_provider.current_time_millis();
let region = MitoRegion {
region_id: self.region_id,
@@ -393,8 +392,9 @@ impl RegionOpener {
)),
file_purger,
provider: provider.clone(),
last_flush_millis: AtomicI64::new(time_provider.current_time_millis()),
time_provider,
last_flush_millis: AtomicI64::new(now),
last_compaction_millis: AtomicI64::new(now),
time_provider: self.time_provider.clone(),
memtable_builder,
stats: self.stats.clone(),
};

View File

@@ -958,6 +958,13 @@ impl WorkerListener {
listener.on_file_cache_filled(_file_id);
}
}
pub(crate) fn on_compaction_scheduled(&self, _region_id: RegionId) {
#[cfg(any(test, feature = "test"))]
if let Some(listener) = &self.listener {
listener.on_compaction_scheduled(_region_id);
}
}
}
#[cfg(test)]

View File

@@ -56,6 +56,7 @@ impl<S: LogStore> RegionWorkerLoop<S> {
self.purge_scheduler.clone(),
self.puffin_manager_factory.clone(),
self.intermediate_manager.clone(),
self.time_provider.clone(),
)
.cache(Some(self.cache_manager.clone()))
.options(region.version().options.clone())?

View File

@@ -12,17 +12,18 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use common_telemetry::{error, info};
use store_api::logstore::LogStore;
use api::v1::region::compact_request;
use common_telemetry::{error, info, warn};
use store_api::region_request::RegionCompactRequest;
use store_api::storage::RegionId;
use crate::error::RegionNotFoundSnafu;
use crate::metrics::COMPACTION_REQUEST_COUNT;
use crate::region::MitoRegionRef;
use crate::request::{CompactionFailed, CompactionFinished, OnFailure, OptionOutputTx};
use crate::worker::RegionWorkerLoop;
impl<S: LogStore> RegionWorkerLoop<S> {
impl<S> RegionWorkerLoop<S> {
/// Handles compaction request submitted to region worker.
pub(crate) async fn handle_compaction_request(
&mut self,
@@ -68,6 +69,7 @@ impl<S: LogStore> RegionWorkerLoop<S> {
return;
}
};
region.update_compaction_millis();
region
.version_control
@@ -89,4 +91,30 @@ impl<S: LogStore> RegionWorkerLoop<S> {
self.compaction_scheduler
.on_compaction_failed(req.region_id, req.err);
}
/// Schedule compaction for the region if necessary.
pub(crate) async fn schedule_compaction(&mut self, region: &MitoRegionRef) {
let now = self.time_provider.current_time_millis();
if now - region.last_compaction_millis()
>= self.config.min_compaction_interval.as_millis() as i64
{
if let Err(e) = self
.compaction_scheduler
.schedule_compaction(
region.region_id,
compact_request::Options::Regular(Default::default()),
&region.version_control,
&region.access_layer,
OptionOutputTx::none(),
&region.manifest_ctx,
)
.await
{
warn!(
"Failed to schedule compaction for region: {}, err: {}",
region.region_id, e
);
}
}
}
}

View File

@@ -62,6 +62,7 @@ impl<S: LogStore> RegionWorkerLoop<S> {
self.purge_scheduler.clone(),
self.puffin_manager_factory.clone(),
self.intermediate_manager.clone(),
self.time_provider.clone(),
)
.metadata(metadata)
.parse_options(request.options)?

View File

@@ -16,8 +16,7 @@
use std::sync::Arc;
use api::v1::region::compact_request;
use common_telemetry::{error, info, warn};
use common_telemetry::{error, info};
use store_api::logstore::LogStore;
use store_api::region_request::RegionFlushRequest;
use store_api::storage::RegionId;
@@ -242,23 +241,7 @@ impl<S: LogStore> RegionWorkerLoop<S> {
self.handle_stalled_requests().await;
// Schedules compaction.
if let Err(e) = self
.compaction_scheduler
.schedule_compaction(
region.region_id,
compact_request::Options::Regular(Default::default()),
&region.version_control,
&region.access_layer,
OptionOutputTx::none(),
&region.manifest_ctx,
)
.await
{
warn!(
"Failed to schedule compaction after flush, region: {}, err: {}",
region.region_id, e
);
}
self.schedule_compaction(&region).await;
self.listener.on_flush_success(region_id);
}

View File

@@ -148,6 +148,9 @@ impl<S> RegionWorkerLoop<S> {
}
};
let need_compaction =
edit_result.result.is_ok() && !edit_result.edit.files_to_add.is_empty();
if edit_result.result.is_ok() {
// Applies the edit to the region.
region
@@ -165,6 +168,10 @@ impl<S> RegionWorkerLoop<S> {
self.handle_region_edit(request).await;
}
}
if need_compaction {
self.schedule_compaction(&region).await;
}
}
/// Writes truncate action to the manifest and then applies it to the region in background.

View File

@@ -97,6 +97,7 @@ impl<S: LogStore> RegionWorkerLoop<S> {
self.purge_scheduler.clone(),
self.puffin_manager_factory.clone(),
self.intermediate_manager.clone(),
self.time_provider.clone(),
)
.skip_wal_replay(request.skip_wal_replay)
.cache(Some(self.cache_manager.clone()))

View File

@@ -841,6 +841,7 @@ experimental_write_cache_size = "512MiB"
sst_write_buffer_size = "8MiB"
parallel_scan_channel_size = 32
allow_stale_entries = false
min_compaction_interval = "0s"
[region_engine.mito.index]
aux_path = ""