refactor: add fallback_to_local region option (#4578)

* refactor: add 'fallback_to_local_compaction' region option

* refactor: use 'fallback_to_local'
This commit is contained in:
zyy17
2024-08-23 11:09:43 +08:00
committed by GitHub
parent 8d61e6fe49
commit 5177717f71
4 changed files with 29 additions and 4 deletions

View File

@@ -48,8 +48,8 @@ use crate::compaction::picker::{new_picker, CompactionTask};
use crate::compaction::task::CompactionTaskImpl;
use crate::config::MitoConfig;
use crate::error::{
CompactRegionSnafu, Error, RegionClosedSnafu, RegionDroppedSnafu, RegionTruncatedSnafu, Result,
TimeRangePredicateOverflowSnafu,
CompactRegionSnafu, Error, RegionClosedSnafu, RegionDroppedSnafu, RegionTruncatedSnafu,
RemoteCompactionSnafu, Result, TimeRangePredicateOverflowSnafu,
};
use crate::metrics::COMPACTION_STAGE_ELAPSED;
use crate::read::projection::ProjectionMapper;
@@ -314,6 +314,16 @@ impl CompactionScheduler {
return Ok(());
}
Err(e) => {
if !current_version.options.compaction.fallback_to_local() {
error!(e; "Failed to schedule remote compaction job for region {}", region_id);
return RemoteCompactionSnafu {
region_id,
job_id: None,
reason: e.reason,
}
.fail();
}
error!(e; "Failed to schedule remote compaction job for region {}, fallback to local compaction", region_id);
// Return the waiters back to the caller for local compaction.

View File

@@ -757,14 +757,14 @@ pub enum Error {
},
#[snafu(display(
"Failed to remotely compact region {} by job {} due to {}",
"Failed to remotely compact region {} by job {:?} due to {}",
region_id,
job_id,
reason
))]
RemoteCompaction {
region_id: RegionId,
job_id: JobId,
job_id: Option<JobId>,
reason: String,
#[snafu(implicit)]
location: Location,

View File

@@ -170,6 +170,12 @@ impl CompactionOptions {
CompactionOptions::Twcs(opts) => opts.remote_compaction,
}
}
pub(crate) fn fallback_to_local(&self) -> bool {
match self {
CompactionOptions::Twcs(opts) => opts.fallback_to_local,
}
}
}
impl Default for CompactionOptions {
@@ -201,6 +207,9 @@ pub struct TwcsOptions {
/// Whether to use remote compaction.
#[serde_as(as = "DisplayFromStr")]
pub remote_compaction: bool,
/// Whether to fall back to local compaction if remote compaction fails.
#[serde_as(as = "DisplayFromStr")]
pub fallback_to_local: bool,
}
with_prefix!(prefix_twcs "compaction.twcs.");
@@ -228,6 +237,7 @@ impl Default for TwcsOptions {
max_inactive_window_files: 1,
time_window: None,
remote_compaction: false,
fallback_to_local: true,
}
}
}
@@ -590,6 +600,7 @@ mod tests {
("compaction.twcs.time_window", "2h"),
("compaction.type", "twcs"),
("compaction.twcs.remote_compaction", "false"),
("compaction.twcs.fallback_to_local", "true"),
("storage", "S3"),
("append_mode", "false"),
("index.inverted_index.ignore_column_ids", "1,2,3"),
@@ -614,6 +625,7 @@ mod tests {
max_inactive_window_files: 3,
time_window: Some(Duration::from_secs(3600 * 2)),
remote_compaction: false,
fallback_to_local: true,
}),
storage: Some("S3".to_string()),
append_mode: false,
@@ -645,6 +657,7 @@ mod tests {
max_inactive_window_files: usize::MAX,
time_window: Some(Duration::from_secs(3600 * 2)),
remote_compaction: false,
fallback_to_local: true,
}),
storage: Some("S3".to_string()),
append_mode: false,
@@ -710,6 +723,7 @@ mod tests {
max_inactive_window_files: 7,
time_window: Some(Duration::from_secs(3600 * 2)),
remote_compaction: false,
fallback_to_local: true,
}),
storage: Some("S3".to_string()),
append_mode: false,

View File

@@ -33,6 +33,7 @@ pub fn is_mito_engine_option_key(key: &str) -> bool {
"compaction.twcs.max_inactive_window_files",
"compaction.twcs.time_window",
"compaction.twcs.remote_compaction",
"compaction.twcs.fallback_to_local",
"storage",
"index.inverted_index.ignore_column_ids",
"index.inverted_index.segment_row_count",