feat: set default compaction parallelism (#5371)

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
This commit is contained in:
Ruihang Xia
2025-01-16 19:16:56 +08:00
committed by Yingwen
parent 27918686d6
commit 1d1bb83a9f
5 changed files with 30 additions and 4 deletions

View File

@@ -40,6 +40,7 @@ use snafu::{OptionExt, ResultExt};
use store_api::metadata::RegionMetadataRef;
use store_api::storage::{RegionId, TableId};
use table::predicate::Predicate;
use task::MAX_PARALLEL_COMPACTION;
use tokio::sync::mpsc::{self, Sender};
use crate::access_layer::AccessLayerRef;
@@ -85,6 +86,7 @@ pub struct CompactionRequest {
pub(crate) manifest_ctx: ManifestContextRef,
pub(crate) listener: WorkerListener,
pub(crate) schema_metadata_manager: SchemaMetadataManagerRef,
pub(crate) max_parallelism: usize,
}
impl CompactionRequest {
@@ -145,6 +147,7 @@ impl CompactionScheduler {
waiter: OptionOutputTx,
manifest_ctx: &ManifestContextRef,
schema_metadata_manager: SchemaMetadataManagerRef,
max_parallelism: usize,
) -> Result<()> {
if let Some(status) = self.region_status.get_mut(&region_id) {
// Region is compacting. Add the waiter to pending list.
@@ -163,6 +166,7 @@ impl CompactionScheduler {
manifest_ctx,
self.listener.clone(),
schema_metadata_manager,
max_parallelism,
);
self.region_status.insert(region_id, status);
let result = self
@@ -193,6 +197,7 @@ impl CompactionScheduler {
manifest_ctx,
self.listener.clone(),
schema_metadata_manager,
MAX_PARALLEL_COMPACTION,
);
// Try to schedule next compaction task for this region.
if let Err(e) = self
@@ -264,6 +269,7 @@ impl CompactionScheduler {
manifest_ctx,
listener,
schema_metadata_manager,
max_parallelism,
} = request;
let ttl = find_ttl(
@@ -294,6 +300,7 @@ impl CompactionScheduler {
manifest_ctx: manifest_ctx.clone(),
file_purger: None,
ttl: Some(ttl),
max_parallelism,
};
let picker_output = {
@@ -521,6 +528,7 @@ impl CompactionStatus {
manifest_ctx: &ManifestContextRef,
listener: WorkerListener,
schema_metadata_manager: SchemaMetadataManagerRef,
max_parallelism: usize,
) -> CompactionRequest {
let current_version = CompactionVersion::from(self.version_control.current().version);
let start_time = Instant::now();
@@ -535,6 +543,7 @@ impl CompactionStatus {
manifest_ctx: manifest_ctx.clone(),
listener,
schema_metadata_manager,
max_parallelism,
};
if let Some(pending) = self.pending_compaction.take() {
@@ -722,6 +731,7 @@ mod tests {
waiter,
&manifest_ctx,
schema_metadata_manager.clone(),
1,
)
.await
.unwrap();
@@ -742,6 +752,7 @@ mod tests {
waiter,
&manifest_ctx,
schema_metadata_manager,
1,
)
.await
.unwrap();
@@ -795,6 +806,7 @@ mod tests {
OptionOutputTx::none(),
&manifest_ctx,
schema_metadata_manager.clone(),
1,
)
.await
.unwrap();
@@ -825,6 +837,7 @@ mod tests {
OptionOutputTx::none(),
&manifest_ctx,
schema_metadata_manager.clone(),
1,
)
.await
.unwrap();
@@ -860,6 +873,7 @@ mod tests {
OptionOutputTx::none(),
&manifest_ctx,
schema_metadata_manager,
1,
)
.await
.unwrap();

View File

@@ -91,6 +91,12 @@ pub struct CompactionRegion {
pub(crate) current_version: CompactionVersion,
pub(crate) file_purger: Option<Arc<LocalFilePurger>>,
pub(crate) ttl: Option<TimeToLive>,
/// Controls the parallelism of this compaction task. Default is 1.
///
/// The parallel is inside this compaction task, not across different compaction tasks.
/// It can be different windows of the same compaction task or something like this.
pub max_parallelism: usize,
}
/// OpenCompactionRegionRequest represents the request to open a compaction region.
@@ -99,6 +105,7 @@ pub struct OpenCompactionRegionRequest {
pub region_id: RegionId,
pub region_dir: String,
pub region_options: RegionOptions,
pub max_parallelism: usize,
}
/// Open a compaction region from a compaction request.
@@ -205,6 +212,7 @@ pub async fn open_compaction_region(
current_version,
file_purger: Some(file_purger),
ttl: Some(ttl),
max_parallelism: req.max_parallelism,
})
}
@@ -266,6 +274,7 @@ impl Compactor for DefaultCompactor {
let mut futs = Vec::with_capacity(picker_output.outputs.len());
let mut compacted_inputs =
Vec::with_capacity(picker_output.outputs.iter().map(|o| o.inputs.len()).sum());
let internal_parallelism = compaction_region.max_parallelism.max(1);
for output in picker_output.outputs.drain(..) {
compacted_inputs.extend(output.inputs.iter().map(|f| f.meta_ref().clone()));
@@ -358,9 +367,8 @@ impl Compactor for DefaultCompactor {
}
let mut output_files = Vec::with_capacity(futs.len());
while !futs.is_empty() {
let mut task_chunk =
Vec::with_capacity(crate::compaction::task::MAX_PARALLEL_COMPACTION);
for _ in 0..crate::compaction::task::MAX_PARALLEL_COMPACTION {
let mut task_chunk = Vec::with_capacity(internal_parallelism);
for _ in 0..internal_parallelism {
if let Some(task) = futs.pop() {
task_chunk.push(common_runtime::spawn_compact(task));
}

View File

@@ -32,7 +32,7 @@ use crate::request::{
use crate::worker::WorkerListener;
/// Maximum number of compaction tasks in parallel.
pub const MAX_PARALLEL_COMPACTION: usize = 8;
pub const MAX_PARALLEL_COMPACTION: usize = 1;
pub(crate) struct CompactionTaskImpl {
pub compaction_region: CompactionRegion,

View File

@@ -464,6 +464,7 @@ async fn test_open_compaction_region() {
region_id,
region_dir: region_dir.clone(),
region_options: RegionOptions::default(),
max_parallelism: 1,
};
let compaction_region = open_compaction_region(

View File

@@ -45,6 +45,8 @@ impl<S> RegionWorkerLoop<S> {
sender,
&region.manifest_ctx,
self.schema_metadata_manager.clone(),
// TODO(yingwen): expose this to frontend
1,
)
.await
{
@@ -113,6 +115,7 @@ impl<S> RegionWorkerLoop<S> {
OptionOutputTx::none(),
&region.manifest_ctx,
self.schema_metadata_manager.clone(),
1,
)
.await
{