feat: support alter twcs compaction options (#4965)

support alter twcs compression options
This commit is contained in:
Lanqing Yang
2024-11-12 22:27:32 -08:00
committed by GitHub
parent 3bf9981aab
commit 3bbcde8e58
6 changed files with 305 additions and 25 deletions

View File

@@ -14,11 +14,18 @@
//! Handling alter related requests.
use std::str::FromStr;
use std::sync::Arc;
use common_base::readable_size::ReadableSize;
use common_telemetry::{debug, info};
use humantime_serde::re::humantime;
use snafu::ResultExt;
use store_api::metadata::{RegionMetadata, RegionMetadataBuilder, RegionMetadataRef};
use store_api::metadata::{
InvalidRegionOptionChangeRequestSnafu, MetadataError, RegionMetadata, RegionMetadataBuilder,
RegionMetadataRef,
};
use store_api::mito_engine_options;
use store_api::region_request::{AlterKind, ChangeOption, RegionAlterRequest};
use store_api::storage::RegionId;
@@ -27,6 +34,8 @@ use crate::error::{
};
use crate::flush::FlushReason;
use crate::manifest::action::RegionChange;
use crate::region::options::CompactionOptions::Twcs;
use crate::region::options::TwcsOptions;
use crate::region::version::VersionRef;
use crate::region::MitoRegionRef;
use crate::request::{DdlRequest, OptionOutputTx, SenderDdlRequest};
@@ -50,7 +59,10 @@ impl<S> RegionWorkerLoop<S> {
// fast path for memory state changes like options.
if let AlterKind::ChangeRegionOptions { options } = request.kind {
self.handle_alter_region_options(region, version, options, sender);
match self.handle_alter_region_options(region, version, options) {
Ok(_) => sender.send(Ok(0)),
Err(e) => sender.send(Err(e).context(InvalidMetadataSnafu)),
}
return;
}
@@ -151,8 +163,7 @@ impl<S> RegionWorkerLoop<S> {
region: MitoRegionRef,
version: VersionRef,
options: Vec<ChangeOption>,
sender: OptionOutputTx,
) {
) -> std::result::Result<(), MetadataError> {
let mut current_options = version.options.clone();
for option in options {
match option {
@@ -167,10 +178,20 @@ impl<S> RegionWorkerLoop<S> {
current_options.ttl = Some(new_ttl);
}
}
ChangeOption::Twsc(key, value) => {
let Twcs(options) = &mut current_options.compaction;
change_twcs_options(
options,
&TwcsOptions::default(),
&key,
&value,
region.region_id,
)?;
}
}
}
region.version_control.alter_options(current_options);
sender.send(Ok(0));
Ok(())
}
}
@@ -191,3 +212,89 @@ fn metadata_after_alteration(
Ok(Arc::new(new_meta))
}
fn change_twcs_options(
options: &mut TwcsOptions,
default_option: &TwcsOptions,
key: &str,
value: &str,
region_id: RegionId,
) -> std::result::Result<(), MetadataError> {
match key {
mito_engine_options::TWCS_MAX_ACTIVE_WINDOW_RUNS => {
let runs = parse_usize_with_default(key, value, default_option.max_active_window_runs)?;
log_option_update(region_id, key, options.max_active_window_runs, runs);
options.max_active_window_runs = runs;
}
mito_engine_options::TWCS_MAX_ACTIVE_WINDOW_FILES => {
let files =
parse_usize_with_default(key, value, default_option.max_active_window_files)?;
log_option_update(region_id, key, options.max_active_window_files, files);
options.max_active_window_files = files;
}
mito_engine_options::TWCS_MAX_INACTIVE_WINDOW_RUNS => {
let runs =
parse_usize_with_default(key, value, default_option.max_inactive_window_runs)?;
log_option_update(region_id, key, options.max_inactive_window_runs, runs);
options.max_inactive_window_runs = runs;
}
mito_engine_options::TWCS_MAX_INACTIVE_WINDOW_FILES => {
let files =
parse_usize_with_default(key, value, default_option.max_inactive_window_files)?;
log_option_update(region_id, key, options.max_inactive_window_files, files);
options.max_inactive_window_files = files;
}
mito_engine_options::TWCS_MAX_OUTPUT_FILE_SIZE => {
let size =
if value.is_empty() {
default_option.max_output_file_size
} else {
Some(ReadableSize::from_str(value).map_err(|_| {
InvalidRegionOptionChangeRequestSnafu { key, value }.build()
})?)
};
log_option_update(region_id, key, options.max_output_file_size, size);
options.max_output_file_size = size;
}
mito_engine_options::TWCS_TIME_WINDOW => {
let window =
if value.is_empty() {
default_option.time_window
} else {
Some(humantime::parse_duration(value).map_err(|_| {
InvalidRegionOptionChangeRequestSnafu { key, value }.build()
})?)
};
log_option_update(region_id, key, options.time_window, window);
options.time_window = window;
}
_ => return InvalidRegionOptionChangeRequestSnafu { key, value }.fail(),
}
Ok(())
}
fn parse_usize_with_default(
key: &str,
value: &str,
default: usize,
) -> std::result::Result<usize, MetadataError> {
if value.is_empty() {
Ok(default)
} else {
value
.parse::<usize>()
.map_err(|_| InvalidRegionOptionChangeRequestSnafu { key, value }.build())
}
}
fn log_option_update<T: std::fmt::Debug>(
region_id: RegionId,
option_name: &str,
prev_value: T,
cur_value: T,
) {
info!(
"Update region {}: {}, previous: {:?}, new: {:?}",
option_name, region_id, prev_value, cur_value
);
}

View File

@@ -23,20 +23,40 @@ pub const APPEND_MODE_KEY: &str = "append_mode";
pub const MERGE_MODE_KEY: &str = "merge_mode";
/// Option key for TTL(time-to-live)
pub const TTL_KEY: &str = "ttl";
/// Option key for compaction type.
pub const COMPACTION_TYPE: &str = "compaction.type";
/// TWCS compaction strategy.
pub const COMPACTION_TYPE_TWCS: &str = "twcs";
/// Option key for twcs max active window runs.
pub const TWCS_MAX_ACTIVE_WINDOW_RUNS: &str = "compaction.twcs.max_active_window_runs";
/// Option key for twcs max active window files.
pub const TWCS_MAX_ACTIVE_WINDOW_FILES: &str = "compaction.twcs.max_active_window_files";
/// Option key for twcs max inactive window runs.
pub const TWCS_MAX_INACTIVE_WINDOW_RUNS: &str = "compaction.twcs.max_inactive_window_runs";
/// Option key for twcs max inactive window files.
pub const TWCS_MAX_INACTIVE_WINDOW_FILES: &str = "compaction.twcs.max_inactive_window_files";
/// Option key for twcs max output file size.
pub const TWCS_MAX_OUTPUT_FILE_SIZE: &str = "compaction.twcs.max_output_file_size";
/// Option key for twcs time window.
pub const TWCS_TIME_WINDOW: &str = "compaction.twcs.time_window";
/// Option key for twcs remote compaction.
pub const REMOTE_COMPACTION: &str = "compaction.twcs.remote_compaction";
/// Option key for twcs fallback to local.
pub const TWCS_FALLBACK_TO_LOCAL: &str = "compaction.twcs.fallback_to_local";
/// Returns true if the `key` is a valid option key for the mito engine.
pub fn is_mito_engine_option_key(key: &str) -> bool {
[
"ttl",
"compaction.type",
"compaction.twcs.max_active_window_runs",
"compaction.twcs.max_active_window_files",
"compaction.twcs.max_inactive_window_runs",
"compaction.twcs.max_inactive_window_files",
"compaction.twcs.max_output_file_size",
"compaction.twcs.time_window",
"compaction.twcs.remote_compaction",
"compaction.twcs.fallback_to_local",
COMPACTION_TYPE,
TWCS_MAX_ACTIVE_WINDOW_RUNS,
TWCS_MAX_ACTIVE_WINDOW_FILES,
TWCS_MAX_INACTIVE_WINDOW_RUNS,
TWCS_MAX_INACTIVE_WINDOW_FILES,
TWCS_MAX_OUTPUT_FILE_SIZE,
TWCS_TIME_WINDOW,
REMOTE_COMPACTION,
TWCS_FALLBACK_TO_LOCAL,
"storage",
"index.inverted_index.ignore_column_ids",
"index.inverted_index.segment_row_count",

View File

@@ -38,7 +38,11 @@ use crate::metadata::{
InvalidRegionOptionChangeRequestSnafu, InvalidRegionRequestSnafu, MetadataError,
RegionMetadata, Result,
};
use crate::mito_engine_options::TTL_KEY;
use crate::mito_engine_options::{
TTL_KEY, TWCS_MAX_ACTIVE_WINDOW_FILES, TWCS_MAX_ACTIVE_WINDOW_RUNS,
TWCS_MAX_INACTIVE_WINDOW_FILES, TWCS_MAX_INACTIVE_WINDOW_RUNS, TWCS_MAX_OUTPUT_FILE_SIZE,
TWCS_TIME_WINDOW,
};
use crate::path_utils::region_dir;
use crate::storage::{ColumnId, RegionId, ScanRequest};
@@ -712,6 +716,8 @@ impl From<v1::ChangeColumnType> for ChangeColumnType {
#[derive(Debug, Eq, PartialEq, Clone, Serialize, Deserialize)]
pub enum ChangeOption {
TTL(Duration),
// Modifying TwscOptions with values as (option name, new value).
Twsc(String, String),
}
impl TryFrom<&ChangeTableOption> for ChangeOption {
@@ -719,16 +725,24 @@ impl TryFrom<&ChangeTableOption> for ChangeOption {
fn try_from(value: &ChangeTableOption) -> std::result::Result<Self, Self::Error> {
let ChangeTableOption { key, value } = value;
if key == TTL_KEY {
let ttl = if value.is_empty() {
Duration::from_secs(0)
} else {
humantime::parse_duration(value)
.map_err(|_| InvalidRegionOptionChangeRequestSnafu { key, value }.build())?
};
Ok(Self::TTL(ttl))
} else {
InvalidRegionOptionChangeRequestSnafu { key, value }.fail()
match key.as_str() {
TTL_KEY => {
let ttl = if value.is_empty() {
Duration::from_secs(0)
} else {
humantime::parse_duration(value)
.map_err(|_| InvalidRegionOptionChangeRequestSnafu { key, value }.build())?
};
Ok(Self::TTL(ttl))
}
TWCS_MAX_ACTIVE_WINDOW_RUNS
| TWCS_MAX_ACTIVE_WINDOW_FILES
| TWCS_MAX_INACTIVE_WINDOW_FILES
| TWCS_MAX_INACTIVE_WINDOW_RUNS
| TWCS_MAX_OUTPUT_FILE_SIZE
| TWCS_TIME_WINDOW => Ok(Self::Twsc(key.to_string(), value.to_string())),
_ => InvalidRegionOptionChangeRequestSnafu { key, value }.fail(),
}
}
}

View File

@@ -26,6 +26,7 @@ use datatypes::schema::{
use derive_builder::Builder;
use serde::{Deserialize, Serialize};
use snafu::{ensure, OptionExt, ResultExt};
use store_api::mito_engine_options::{COMPACTION_TYPE, COMPACTION_TYPE_TWCS};
use store_api::region_request::ChangeOption;
use store_api::storage::{ColumnDescriptor, ColumnDescriptorBuilder, ColumnId, RegionId};
@@ -225,6 +226,21 @@ impl TableMeta {
new_options.ttl = Some(*new_ttl);
}
}
ChangeOption::Twsc(key, value) => {
if !value.is_empty() {
new_options
.extra_options
.insert(key.to_string(), value.to_string());
// Ensure node restart correctly.
new_options.extra_options.insert(
COMPACTION_TYPE.to_string(),
COMPACTION_TYPE_TWCS.to_string(),
);
} else {
// Invalidate the previous change option if an empty value has been set.
new_options.extra_options.remove(key.as_str());
}
}
}
}
let mut builder = self.new_meta_builder();

View File

@@ -137,6 +137,108 @@ SELECT i FROM ato;
| 2 |
+---+
ALTER TABLE ato SET 'compaction.twcs.time_window'='2h';
Affected Rows: 0
ALTER TABLE ato SET 'compaction.twcs.max_output_file_size'='500MB';
Affected Rows: 0
ALTER TABLE ato SET 'compaction.twcs.max_inactive_window_files'='2';
Affected Rows: 0
ALTER TABLE ato SET 'compaction.twcs.max_active_window_files'='2';
Affected Rows: 0
ALTER TABLE ato SET 'compaction.twcs.max_active_window_runs'='6';
Affected Rows: 0
ALTER TABLE ato SET 'compaction.twcs.max_inactive_window_runs'='6';
Affected Rows: 0
SHOW CREATE TABLE ato;
+-------+----------------------------------------------------+
| Table | Create Table |
+-------+----------------------------------------------------+
| ato | CREATE TABLE IF NOT EXISTS "ato" ( |
| | "i" INT NULL, |
| | "j" TIMESTAMP(3) NOT NULL, |
| | TIME INDEX ("j"), |
| | PRIMARY KEY ("i") |
| | ) |
| | |
| | ENGINE=mito |
| | WITH( |
| | compaction.twcs.max_active_window_files = '2', |
| | compaction.twcs.max_active_window_runs = '6', |
| | compaction.twcs.max_inactive_window_files = '2', |
| | compaction.twcs.max_inactive_window_runs = '6', |
| | compaction.twcs.max_output_file_size = '500MB', |
| | compaction.twcs.time_window = '2h', |
| | compaction.type = 'twcs', |
| | ttl = '1s' |
| | ) |
+-------+----------------------------------------------------+
ALTER TABLE ato SET 'compaction.twcs.time_window'='';
Affected Rows: 0
SHOW CREATE TABLE ato;
+-------+----------------------------------------------------+
| Table | Create Table |
+-------+----------------------------------------------------+
| ato | CREATE TABLE IF NOT EXISTS "ato" ( |
| | "i" INT NULL, |
| | "j" TIMESTAMP(3) NOT NULL, |
| | TIME INDEX ("j"), |
| | PRIMARY KEY ("i") |
| | ) |
| | |
| | ENGINE=mito |
| | WITH( |
| | compaction.twcs.max_active_window_files = '2', |
| | compaction.twcs.max_active_window_runs = '6', |
| | compaction.twcs.max_inactive_window_files = '2', |
| | compaction.twcs.max_inactive_window_runs = '6', |
| | compaction.twcs.max_output_file_size = '500MB', |
| | compaction.type = 'twcs', |
| | ttl = '1s' |
| | ) |
+-------+----------------------------------------------------+
-- SQLNESS ARG restart=true
SHOW CREATE TABLE ato;
+-------+----------------------------------------------------+
| Table | Create Table |
+-------+----------------------------------------------------+
| ato | CREATE TABLE IF NOT EXISTS "ato" ( |
| | "i" INT NULL, |
| | "j" TIMESTAMP(3) NOT NULL, |
| | TIME INDEX ("j"), |
| | PRIMARY KEY ("i") |
| | ) |
| | |
| | ENGINE=mito |
| | WITH( |
| | compaction.twcs.max_active_window_files = '2', |
| | compaction.twcs.max_active_window_runs = '6', |
| | compaction.twcs.max_inactive_window_files = '2', |
| | compaction.twcs.max_inactive_window_runs = '6', |
| | compaction.twcs.max_output_file_size = '500MB', |
| | compaction.type = 'twcs', |
| | ttl = '1s' |
| | ) |
+-------+----------------------------------------------------+
DROP TABLE ato;
Affected Rows: 0

View File

@@ -28,4 +28,25 @@ SHOW CREATE TABLE ato;
SELECT i FROM ato;
ALTER TABLE ato SET 'compaction.twcs.time_window'='2h';
ALTER TABLE ato SET 'compaction.twcs.max_output_file_size'='500MB';
ALTER TABLE ato SET 'compaction.twcs.max_inactive_window_files'='2';
ALTER TABLE ato SET 'compaction.twcs.max_active_window_files'='2';
ALTER TABLE ato SET 'compaction.twcs.max_active_window_runs'='6';
ALTER TABLE ato SET 'compaction.twcs.max_inactive_window_runs'='6';
SHOW CREATE TABLE ato;
ALTER TABLE ato SET 'compaction.twcs.time_window'='';
SHOW CREATE TABLE ato;
-- SQLNESS ARG restart=true
SHOW CREATE TABLE ato;
DROP TABLE ato;