mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-01-03 11:52:54 +00:00
feat: manual compaction parallelism (#7086)
* feat/manual-compaction-parallelism: ### Add Parallelism Support to Compaction Requests - **`Cargo.lock` & `Cargo.toml`**: Updated `greptime-proto` dependency to a new revision. - **`flush_compact_table.rs`**: Enhanced `parse_compact_params` to support a new `parallelism` parameter, allowing users to specify the level of parallelism for table compaction. - **`handle_compaction.rs`**: Integrated `parallelism` into the compaction scheduling process, defaulting to 1 if not specified. - **`request.rs` & `region_request.rs`**: Modified `CompactRequest` to include `parallelism`, with logic to handle unspecifie values. - **`requests.rs`**: Updated `CompactTableRequest` structure to include an optional `parallelism` field. Signed-off-by: Lei, HUANG <mrsatangel@gmail.com> * feat/manual-compaction-parallelism: ### Commit Message Enhance Compaction Request Handling - **`flush_compact_table.rs`**: - Renamed `parse_compact_params` to `parse_compact_request`. - Introduced `DEFAULT_COMPACTION_PARALLELISM` constant. - Updated parsing logic to handle keyword arguments for `strict_window` and `regular` compaction types, including `parallelism` and `window`. - Modified tests to reflect changes in parsing logic and default parallelism handling. - **`request.rs`**: - Updated `parallelism` handling in `RegionRequestBody::Compact` to use the new default value. - **`requests.rs`**: - Changed `CompactTableRequest` to use a non-optional `parallelism` field with a default value of 1. Signed-off-by: Lei, HUANG <mrsatangel@gmail.com> * feat/manual-compaction-parallelism: ### Update `flush_compact_table.rs` Parameter Validation - Modified parameter validation in `flush_compact_table.rs` to restrict the maximum number of parameters from 4 to 3 in the `parse_compact_request` function. Signed-off-by: Lei, HUANG <mrsatangel@gmail.com> * feat/manual-compaction-parallelism: Update `greptime-proto` dependency - Updated the `greptime-proto` dependency to a new revision in both `Cargo.lock` and `Cargo.toml`. Signed-off-by: Lei, HUANG <mrsatangel@gmail.com> --------- Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>
This commit is contained in:
2
Cargo.lock
generated
2
Cargo.lock
generated
@@ -5325,7 +5325,7 @@ dependencies = [
|
||||
[[package]]
|
||||
name = "greptime-proto"
|
||||
version = "0.1.0"
|
||||
source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=a250938d7106b77da0ae915eb0c531411c28cfe3#a250938d7106b77da0ae915eb0c531411c28cfe3"
|
||||
source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=d000eedd3739c003bb139aa42cefe05521a60f7d#d000eedd3739c003bb139aa42cefe05521a60f7d"
|
||||
dependencies = [
|
||||
"prost 0.13.5",
|
||||
"prost-types 0.13.5",
|
||||
|
||||
@@ -147,7 +147,7 @@ etcd-client = { git = "https://github.com/GreptimeTeam/etcd-client", rev = "f62d
|
||||
fst = "0.4.7"
|
||||
futures = "0.3"
|
||||
futures-util = "0.3"
|
||||
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "a250938d7106b77da0ae915eb0c531411c28cfe3" }
|
||||
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "d000eedd3739c003bb139aa42cefe05521a60f7d" }
|
||||
hex = "0.4"
|
||||
http = "1"
|
||||
humantime = "2.1"
|
||||
|
||||
@@ -37,6 +37,8 @@ const COMPACT_TYPE_STRICT_WINDOW: &str = "strict_window";
|
||||
/// Compact type: strict window (short name).
|
||||
const COMPACT_TYPE_STRICT_WINDOW_SHORT: &str = "swcs";
|
||||
|
||||
const DEFAULT_COMPACTION_PARALLELISM: u32 = 1;
|
||||
|
||||
#[admin_fn(
|
||||
name = FlushTableFunction,
|
||||
display_name = flush_table,
|
||||
@@ -95,7 +97,7 @@ pub(crate) async fn compact_table(
|
||||
query_ctx: &QueryContextRef,
|
||||
params: &[ValueRef<'_>],
|
||||
) -> Result<Value> {
|
||||
let request = parse_compact_params(params, query_ctx)?;
|
||||
let request = parse_compact_request(params, query_ctx)?;
|
||||
info!("Compact table request: {:?}", request);
|
||||
|
||||
let affected_rows = table_mutation_handler
|
||||
@@ -117,37 +119,46 @@ fn compact_signature() -> Signature {
|
||||
/// - `[<table_name>]`: only tables name provided, using default compaction type: regular
|
||||
/// - `[<table_name>, <type>]`: specify table name and compaction type. The compaction options will be default.
|
||||
/// - `[<table_name>, <type>, <options>]`: provides both type and type-specific options.
|
||||
fn parse_compact_params(
|
||||
/// - For `twcs`, it accepts `parallelism=[N]` where N is an unsigned 32 bits number
|
||||
/// - For `swcs`, it accepts two numeric parameter: `parallelism` and `window`.
|
||||
fn parse_compact_request(
|
||||
params: &[ValueRef<'_>],
|
||||
query_ctx: &QueryContextRef,
|
||||
) -> Result<CompactTableRequest> {
|
||||
ensure!(
|
||||
!params.is_empty(),
|
||||
!params.is_empty() && params.len() <= 3,
|
||||
InvalidFuncArgsSnafu {
|
||||
err_msg: "Args cannot be empty",
|
||||
err_msg: format!(
|
||||
"The length of the args is not correct, expect 1-4, have: {}",
|
||||
params.len()
|
||||
),
|
||||
}
|
||||
);
|
||||
|
||||
let (table_name, compact_type) = match params {
|
||||
let (table_name, compact_type, parallelism) = match params {
|
||||
// 1. Only table name, strategy defaults to twcs and default parallelism.
|
||||
[ValueRef::String(table_name)] => (
|
||||
table_name,
|
||||
compact_request::Options::Regular(Default::default()),
|
||||
DEFAULT_COMPACTION_PARALLELISM,
|
||||
),
|
||||
// 2. Both table name and strategy are provided.
|
||||
[
|
||||
ValueRef::String(table_name),
|
||||
ValueRef::String(compact_ty_str),
|
||||
] => {
|
||||
let compact_type = parse_compact_type(compact_ty_str, None)?;
|
||||
(table_name, compact_type)
|
||||
let (compact_type, parallelism) = parse_compact_options(compact_ty_str, None)?;
|
||||
(table_name, compact_type, parallelism)
|
||||
}
|
||||
|
||||
// 3. Table name, strategy and strategy specific options
|
||||
[
|
||||
ValueRef::String(table_name),
|
||||
ValueRef::String(compact_ty_str),
|
||||
ValueRef::String(options_str),
|
||||
] => {
|
||||
let compact_type = parse_compact_type(compact_ty_str, Some(options_str))?;
|
||||
(table_name, compact_type)
|
||||
let (compact_type, parallelism) =
|
||||
parse_compact_options(compact_ty_str, Some(options_str))?;
|
||||
(table_name, compact_type, parallelism)
|
||||
}
|
||||
_ => {
|
||||
return UnsupportedInputDataTypeSnafu {
|
||||
@@ -167,35 +178,126 @@ fn parse_compact_params(
|
||||
schema_name,
|
||||
table_name,
|
||||
compact_options: compact_type,
|
||||
parallelism,
|
||||
})
|
||||
}
|
||||
|
||||
/// Parses compaction strategy type. For `strict_window` or `swcs` strict window compaction is chose,
|
||||
/// Parses compaction strategy type. For `strict_window` or `swcs` strict window compaction is chosen,
|
||||
/// otherwise choose regular (TWCS) compaction.
|
||||
fn parse_compact_type(type_str: &str, option: Option<&str>) -> Result<compact_request::Options> {
|
||||
fn parse_compact_options(
|
||||
type_str: &str,
|
||||
option: Option<&str>,
|
||||
) -> Result<(compact_request::Options, u32)> {
|
||||
if type_str.eq_ignore_ascii_case(COMPACT_TYPE_STRICT_WINDOW)
|
||||
| type_str.eq_ignore_ascii_case(COMPACT_TYPE_STRICT_WINDOW_SHORT)
|
||||
{
|
||||
let window_seconds = option
|
||||
.map(|v| {
|
||||
i64::from_str(v).map_err(|_| {
|
||||
InvalidFuncArgsSnafu {
|
||||
err_msg: format!(
|
||||
"Compact window is expected to be a valid number, provided: {}",
|
||||
v
|
||||
),
|
||||
}
|
||||
.build()
|
||||
})
|
||||
})
|
||||
.transpose()?
|
||||
.unwrap_or(0);
|
||||
let Some(option_str) = option else {
|
||||
return Ok((
|
||||
compact_request::Options::StrictWindow(StrictWindow { window_seconds: 0 }),
|
||||
DEFAULT_COMPACTION_PARALLELISM,
|
||||
));
|
||||
};
|
||||
|
||||
Ok(compact_request::Options::StrictWindow(StrictWindow {
|
||||
window_seconds,
|
||||
}))
|
||||
// For compatibility, accepts single number as window size.
|
||||
if let Ok(window_seconds) = i64::from_str(option_str) {
|
||||
return Ok((
|
||||
compact_request::Options::StrictWindow(StrictWindow { window_seconds }),
|
||||
DEFAULT_COMPACTION_PARALLELISM,
|
||||
));
|
||||
};
|
||||
|
||||
// Parse keyword arguments in forms: `key1=value1,key2=value2`
|
||||
let mut window_seconds = 0i64;
|
||||
let mut parallelism = DEFAULT_COMPACTION_PARALLELISM;
|
||||
|
||||
let pairs: Vec<&str> = option_str.split(',').collect();
|
||||
for pair in pairs {
|
||||
let kv: Vec<&str> = pair.trim().split('=').collect();
|
||||
if kv.len() != 2 {
|
||||
return InvalidFuncArgsSnafu {
|
||||
err_msg: format!("Invalid key-value pair: {}", pair.trim()),
|
||||
}
|
||||
.fail();
|
||||
}
|
||||
|
||||
let key = kv[0].trim();
|
||||
let value = kv[1].trim();
|
||||
|
||||
match key {
|
||||
"window" | "window_seconds" => {
|
||||
window_seconds = i64::from_str(value).map_err(|_| {
|
||||
InvalidFuncArgsSnafu {
|
||||
err_msg: format!("Invalid value for window: {}", value),
|
||||
}
|
||||
.build()
|
||||
})?;
|
||||
}
|
||||
"parallelism" => {
|
||||
parallelism = value.parse::<u32>().map_err(|_| {
|
||||
InvalidFuncArgsSnafu {
|
||||
err_msg: format!("Invalid value for parallelism: {}", value),
|
||||
}
|
||||
.build()
|
||||
})?;
|
||||
}
|
||||
_ => {
|
||||
return InvalidFuncArgsSnafu {
|
||||
err_msg: format!("Unknown parameter: {}", key),
|
||||
}
|
||||
.fail();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok((
|
||||
compact_request::Options::StrictWindow(StrictWindow { window_seconds }),
|
||||
parallelism,
|
||||
))
|
||||
} else {
|
||||
Ok(compact_request::Options::Regular(Default::default()))
|
||||
// TWCS strategy
|
||||
let Some(option_str) = option else {
|
||||
return Ok((
|
||||
compact_request::Options::Regular(Default::default()),
|
||||
DEFAULT_COMPACTION_PARALLELISM,
|
||||
));
|
||||
};
|
||||
|
||||
let mut parallelism = DEFAULT_COMPACTION_PARALLELISM;
|
||||
let pairs: Vec<&str> = option_str.split(',').collect();
|
||||
for pair in pairs {
|
||||
let kv: Vec<&str> = pair.trim().split('=').collect();
|
||||
if kv.len() != 2 {
|
||||
return InvalidFuncArgsSnafu {
|
||||
err_msg: format!("Invalid key-value pair: {}", pair.trim()),
|
||||
}
|
||||
.fail();
|
||||
}
|
||||
|
||||
let key = kv[0].trim();
|
||||
let value = kv[1].trim();
|
||||
|
||||
match key {
|
||||
"parallelism" => {
|
||||
parallelism = value.parse::<u32>().map_err(|_| {
|
||||
InvalidFuncArgsSnafu {
|
||||
err_msg: format!("Invalid value for parallelism: {}", value),
|
||||
}
|
||||
.build()
|
||||
})?;
|
||||
}
|
||||
_ => {
|
||||
return InvalidFuncArgsSnafu {
|
||||
err_msg: format!("Unknown parameter: {}", key),
|
||||
}
|
||||
.fail();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok((
|
||||
compact_request::Options::Regular(Default::default()),
|
||||
parallelism,
|
||||
))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -301,7 +403,7 @@ mod tests {
|
||||
|
||||
assert_eq!(
|
||||
expected,
|
||||
&parse_compact_params(¶ms, &QueryContext::arc()).unwrap()
|
||||
&parse_compact_request(¶ms, &QueryContext::arc()).unwrap()
|
||||
);
|
||||
}
|
||||
}
|
||||
@@ -316,6 +418,7 @@ mod tests {
|
||||
schema_name: DEFAULT_SCHEMA_NAME.to_string(),
|
||||
table_name: "table".to_string(),
|
||||
compact_options: Options::Regular(Default::default()),
|
||||
parallelism: 1,
|
||||
},
|
||||
),
|
||||
(
|
||||
@@ -325,6 +428,7 @@ mod tests {
|
||||
schema_name: DEFAULT_SCHEMA_NAME.to_string(),
|
||||
table_name: "table".to_string(),
|
||||
compact_options: Options::Regular(Default::default()),
|
||||
parallelism: 1,
|
||||
},
|
||||
),
|
||||
(
|
||||
@@ -337,6 +441,7 @@ mod tests {
|
||||
schema_name: DEFAULT_SCHEMA_NAME.to_string(),
|
||||
table_name: "table".to_string(),
|
||||
compact_options: Options::Regular(Default::default()),
|
||||
parallelism: 1,
|
||||
},
|
||||
),
|
||||
(
|
||||
@@ -346,6 +451,7 @@ mod tests {
|
||||
schema_name: DEFAULT_SCHEMA_NAME.to_string(),
|
||||
table_name: "table".to_string(),
|
||||
compact_options: Options::Regular(Default::default()),
|
||||
parallelism: 1,
|
||||
},
|
||||
),
|
||||
(
|
||||
@@ -355,6 +461,7 @@ mod tests {
|
||||
schema_name: DEFAULT_SCHEMA_NAME.to_string(),
|
||||
table_name: "table".to_string(),
|
||||
compact_options: Options::StrictWindow(StrictWindow { window_seconds: 0 }),
|
||||
parallelism: 1,
|
||||
},
|
||||
),
|
||||
(
|
||||
@@ -366,15 +473,7 @@ mod tests {
|
||||
compact_options: Options::StrictWindow(StrictWindow {
|
||||
window_seconds: 3600,
|
||||
}),
|
||||
},
|
||||
),
|
||||
(
|
||||
&["table", "regular", "abcd"],
|
||||
CompactTableRequest {
|
||||
catalog_name: DEFAULT_CATALOG_NAME.to_string(),
|
||||
schema_name: DEFAULT_SCHEMA_NAME.to_string(),
|
||||
table_name: "table".to_string(),
|
||||
compact_options: Options::Regular(Default::default()),
|
||||
parallelism: 1,
|
||||
},
|
||||
),
|
||||
(
|
||||
@@ -386,12 +485,82 @@ mod tests {
|
||||
compact_options: Options::StrictWindow(StrictWindow {
|
||||
window_seconds: 120,
|
||||
}),
|
||||
parallelism: 1,
|
||||
},
|
||||
),
|
||||
// Test with parallelism parameter
|
||||
(
|
||||
&["table", "regular", "parallelism=4"],
|
||||
CompactTableRequest {
|
||||
catalog_name: DEFAULT_CATALOG_NAME.to_string(),
|
||||
schema_name: DEFAULT_SCHEMA_NAME.to_string(),
|
||||
table_name: "table".to_string(),
|
||||
compact_options: Options::Regular(Default::default()),
|
||||
parallelism: 4,
|
||||
},
|
||||
),
|
||||
(
|
||||
&["table", "strict_window", "window=3600,parallelism=2"],
|
||||
CompactTableRequest {
|
||||
catalog_name: DEFAULT_CATALOG_NAME.to_string(),
|
||||
schema_name: DEFAULT_SCHEMA_NAME.to_string(),
|
||||
table_name: "table".to_string(),
|
||||
compact_options: Options::StrictWindow(StrictWindow {
|
||||
window_seconds: 3600,
|
||||
}),
|
||||
parallelism: 2,
|
||||
},
|
||||
),
|
||||
(
|
||||
&["table", "strict_window", "window=3600"],
|
||||
CompactTableRequest {
|
||||
catalog_name: DEFAULT_CATALOG_NAME.to_string(),
|
||||
schema_name: DEFAULT_SCHEMA_NAME.to_string(),
|
||||
table_name: "table".to_string(),
|
||||
compact_options: Options::StrictWindow(StrictWindow {
|
||||
window_seconds: 3600,
|
||||
}),
|
||||
parallelism: 1,
|
||||
},
|
||||
),
|
||||
(
|
||||
&["table", "strict_window", "window_seconds=7200"],
|
||||
CompactTableRequest {
|
||||
catalog_name: DEFAULT_CATALOG_NAME.to_string(),
|
||||
schema_name: DEFAULT_SCHEMA_NAME.to_string(),
|
||||
table_name: "table".to_string(),
|
||||
compact_options: Options::StrictWindow(StrictWindow {
|
||||
window_seconds: 7200,
|
||||
}),
|
||||
parallelism: 1,
|
||||
},
|
||||
),
|
||||
(
|
||||
&["table", "strict_window", "window=1800"],
|
||||
CompactTableRequest {
|
||||
catalog_name: DEFAULT_CATALOG_NAME.to_string(),
|
||||
schema_name: DEFAULT_SCHEMA_NAME.to_string(),
|
||||
table_name: "table".to_string(),
|
||||
compact_options: Options::StrictWindow(StrictWindow {
|
||||
window_seconds: 1800,
|
||||
}),
|
||||
parallelism: 1,
|
||||
},
|
||||
),
|
||||
(
|
||||
&["table", "regular", "parallelism=8"],
|
||||
CompactTableRequest {
|
||||
catalog_name: DEFAULT_CATALOG_NAME.to_string(),
|
||||
schema_name: DEFAULT_SCHEMA_NAME.to_string(),
|
||||
table_name: "table".to_string(),
|
||||
compact_options: Options::Regular(Default::default()),
|
||||
parallelism: 8,
|
||||
},
|
||||
),
|
||||
]);
|
||||
|
||||
assert!(
|
||||
parse_compact_params(
|
||||
parse_compact_request(
|
||||
&["table", "strict_window", "abc"]
|
||||
.into_iter()
|
||||
.map(ValueRef::String)
|
||||
@@ -402,7 +571,7 @@ mod tests {
|
||||
);
|
||||
|
||||
assert!(
|
||||
parse_compact_params(
|
||||
parse_compact_request(
|
||||
&["a.b.table", "strict_window", "abc"]
|
||||
.into_iter()
|
||||
.map(ValueRef::String)
|
||||
@@ -411,5 +580,88 @@ mod tests {
|
||||
)
|
||||
.is_err()
|
||||
);
|
||||
|
||||
// Test invalid parallelism
|
||||
assert!(
|
||||
parse_compact_request(
|
||||
&["table", "regular", "options", "invalid"]
|
||||
.into_iter()
|
||||
.map(ValueRef::String)
|
||||
.collect::<Vec<_>>(),
|
||||
&QueryContext::arc(),
|
||||
)
|
||||
.is_err()
|
||||
);
|
||||
|
||||
// Test too many parameters
|
||||
assert!(
|
||||
parse_compact_request(
|
||||
&["table", "regular", "options", "4", "extra"]
|
||||
.into_iter()
|
||||
.map(ValueRef::String)
|
||||
.collect::<Vec<_>>(),
|
||||
&QueryContext::arc(),
|
||||
)
|
||||
.is_err()
|
||||
);
|
||||
|
||||
// Test invalid keyword argument format
|
||||
assert!(
|
||||
parse_compact_request(
|
||||
&["table", "strict_window", "window"]
|
||||
.into_iter()
|
||||
.map(ValueRef::String)
|
||||
.collect::<Vec<_>>(),
|
||||
&QueryContext::arc(),
|
||||
)
|
||||
.is_err()
|
||||
);
|
||||
|
||||
// Test invalid keyword
|
||||
assert!(
|
||||
parse_compact_request(
|
||||
&["table", "strict_window", "invalid_key=123"]
|
||||
.into_iter()
|
||||
.map(ValueRef::String)
|
||||
.collect::<Vec<_>>(),
|
||||
&QueryContext::arc(),
|
||||
)
|
||||
.is_err()
|
||||
);
|
||||
|
||||
assert!(
|
||||
parse_compact_request(
|
||||
&["table", "regular", "abcd"]
|
||||
.into_iter()
|
||||
.map(ValueRef::String)
|
||||
.collect::<Vec<_>>(),
|
||||
&QueryContext::arc(),
|
||||
)
|
||||
.is_err()
|
||||
);
|
||||
|
||||
// Test invalid window value
|
||||
assert!(
|
||||
parse_compact_request(
|
||||
&["table", "strict_window", "window=abc"]
|
||||
.into_iter()
|
||||
.map(ValueRef::String)
|
||||
.collect::<Vec<_>>(),
|
||||
&QueryContext::arc(),
|
||||
)
|
||||
.is_err()
|
||||
);
|
||||
|
||||
// Test invalid parallelism in options string
|
||||
assert!(
|
||||
parse_compact_request(
|
||||
&["table", "strict_window", "parallelism=abc"]
|
||||
.into_iter()
|
||||
.map(ValueRef::String)
|
||||
.collect::<Vec<_>>(),
|
||||
&QueryContext::arc(),
|
||||
)
|
||||
.is_err()
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -35,6 +35,7 @@ impl<S> RegionWorkerLoop<S> {
|
||||
return;
|
||||
};
|
||||
COMPACTION_REQUEST_COUNT.inc();
|
||||
let parallelism = req.parallelism.unwrap_or(1) as usize;
|
||||
if let Err(e) = self
|
||||
.compaction_scheduler
|
||||
.schedule_compaction(
|
||||
@@ -45,8 +46,7 @@ impl<S> RegionWorkerLoop<S> {
|
||||
sender,
|
||||
®ion.manifest_ctx,
|
||||
self.schema_metadata_manager.clone(),
|
||||
// TODO(yingwen): expose this to frontend
|
||||
1,
|
||||
parallelism,
|
||||
)
|
||||
.await
|
||||
{
|
||||
@@ -116,7 +116,7 @@ impl<S> RegionWorkerLoop<S> {
|
||||
OptionOutputTx::none(),
|
||||
®ion.manifest_ctx,
|
||||
self.schema_metadata_manager.clone(),
|
||||
1,
|
||||
1, // Default for automatic compaction
|
||||
)
|
||||
.await
|
||||
{
|
||||
|
||||
@@ -109,6 +109,7 @@ impl Requester {
|
||||
.map(|partition| {
|
||||
RegionRequestBody::Compact(CompactRequest {
|
||||
region_id: partition.id.into(),
|
||||
parallelism: request.parallelism,
|
||||
options: Some(request.compact_options),
|
||||
})
|
||||
})
|
||||
@@ -146,6 +147,7 @@ impl Requester {
|
||||
) -> Result<AffectedRows> {
|
||||
let request = RegionRequestBody::Compact(CompactRequest {
|
||||
region_id: region_id.into(),
|
||||
parallelism: 1,
|
||||
options: None, // todo(hl): maybe also support parameters in region compaction.
|
||||
});
|
||||
|
||||
|
||||
@@ -338,9 +338,18 @@ fn make_region_compact(compact: CompactRequest) -> Result<Vec<(RegionId, RegionR
|
||||
let options = compact
|
||||
.options
|
||||
.unwrap_or(compact_request::Options::Regular(Default::default()));
|
||||
// Convert parallelism: a value of 0 indicates no specific parallelism requested (None)
|
||||
let parallelism = if compact.parallelism == 0 {
|
||||
None
|
||||
} else {
|
||||
Some(compact.parallelism)
|
||||
};
|
||||
Ok(vec![(
|
||||
region_id,
|
||||
RegionRequest::Compact(RegionCompactRequest { options }),
|
||||
RegionRequest::Compact(RegionCompactRequest {
|
||||
options,
|
||||
parallelism,
|
||||
}),
|
||||
)])
|
||||
}
|
||||
|
||||
@@ -1332,6 +1341,7 @@ pub struct RegionFlushRequest {
|
||||
#[derive(Debug)]
|
||||
pub struct RegionCompactRequest {
|
||||
pub options: compact_request::Options,
|
||||
pub parallelism: Option<u32>,
|
||||
}
|
||||
|
||||
impl Default for RegionCompactRequest {
|
||||
@@ -1339,6 +1349,7 @@ impl Default for RegionCompactRequest {
|
||||
Self {
|
||||
// Default to regular compaction.
|
||||
options: compact_request::Options::Regular(Default::default()),
|
||||
parallelism: None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -401,6 +401,7 @@ pub struct CompactTableRequest {
|
||||
pub schema_name: String,
|
||||
pub table_name: String,
|
||||
pub compact_options: compact_request::Options,
|
||||
pub parallelism: u32,
|
||||
}
|
||||
|
||||
impl Default for CompactTableRequest {
|
||||
@@ -410,6 +411,7 @@ impl Default for CompactTableRequest {
|
||||
schema_name: Default::default(),
|
||||
table_name: Default::default(),
|
||||
compact_options: compact_request::Options::Regular(Default::default()),
|
||||
parallelism: 1,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user