mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-07-05 13:30:44 +00:00
feat: add repartition column hint (#8291)
* feat: add repartition column hint option Signed-off-by: WenyXu <wenymedia@gmail.com> * feat: support altering repartition column hint Signed-off-by: WenyXu <wenymedia@gmail.com> * chore: update sqlness result Signed-off-by: WenyXu <wenymedia@gmail.com> * fix: reject time index repartition hint Signed-off-by: WenyXu <wenymedia@gmail.com> * refactor: treat rename table as metadata-only alter Signed-off-by: WenyXu <wenymedia@gmail.com> --------- Signed-off-by: WenyXu <wenymedia@gmail.com>
This commit is contained in:
@@ -29,8 +29,8 @@ use snafu::{OptionExt, ResultExt, ensure};
|
||||
use store_api::region_request::{SetRegionOption, UnsetRegionOption};
|
||||
use table::metadata::{TableId, TableMeta};
|
||||
use table::requests::{
|
||||
AddColumnRequest, AlterKind, AlterTableRequest, ModifyColumnTypeRequest, SetDefaultRequest,
|
||||
SetIndexOption, UnsetIndexOption,
|
||||
AddColumnRequest, AlterKind, AlterTableRequest, ModifyColumnTypeRequest,
|
||||
REPARTITION_COLUMN_HINT_KEY, SetDefaultRequest, SetIndexOption, UnsetIndexOption,
|
||||
};
|
||||
|
||||
use crate::error::{
|
||||
@@ -164,21 +164,53 @@ pub fn alter_expr_to_request(
|
||||
AlterKind::RenameTable { new_table_name }
|
||||
}
|
||||
Kind::SetTableOptions(api::v1::SetTableOptions { table_options }) => {
|
||||
AlterKind::SetTableOptions {
|
||||
options: table_options
|
||||
.iter()
|
||||
.map(SetRegionOption::try_from)
|
||||
.collect::<std::result::Result<Vec<_>, _>>()
|
||||
.context(InvalidSetTableOptionRequestSnafu)?,
|
||||
let repartition_column_hint = table_options
|
||||
.iter()
|
||||
.find(|option| option.key == REPARTITION_COLUMN_HINT_KEY);
|
||||
if let Some(option) = repartition_column_hint {
|
||||
ensure!(
|
||||
table_options.len() == 1,
|
||||
error::InvalidTableOptionRequestSnafu {
|
||||
err_msg: format!(
|
||||
"{REPARTITION_COLUMN_HINT_KEY} must be altered separately"
|
||||
),
|
||||
}
|
||||
);
|
||||
AlterKind::SetRepartitionColumnHint {
|
||||
column_name: option.value.clone(),
|
||||
}
|
||||
} else {
|
||||
AlterKind::SetTableOptions {
|
||||
options: table_options
|
||||
.iter()
|
||||
.map(SetRegionOption::try_from)
|
||||
.collect::<std::result::Result<Vec<_>, _>>()
|
||||
.context(InvalidSetTableOptionRequestSnafu)?,
|
||||
}
|
||||
}
|
||||
}
|
||||
Kind::UnsetTableOptions(api::v1::UnsetTableOptions { keys }) => {
|
||||
AlterKind::UnsetTableOptions {
|
||||
keys: keys
|
||||
.iter()
|
||||
.map(|key| UnsetRegionOption::try_from(key.as_str()))
|
||||
.collect::<std::result::Result<Vec<_>, _>>()
|
||||
.context(InvalidUnsetTableOptionRequestSnafu)?,
|
||||
let unset_repartition_column_hint = keys
|
||||
.iter()
|
||||
.any(|key| key.as_str() == REPARTITION_COLUMN_HINT_KEY);
|
||||
if unset_repartition_column_hint {
|
||||
ensure!(
|
||||
keys.len() == 1,
|
||||
error::InvalidTableOptionRequestSnafu {
|
||||
err_msg: format!(
|
||||
"{REPARTITION_COLUMN_HINT_KEY} must be altered separately"
|
||||
),
|
||||
}
|
||||
);
|
||||
AlterKind::UnsetRepartitionColumnHint
|
||||
} else {
|
||||
AlterKind::UnsetTableOptions {
|
||||
keys: keys
|
||||
.iter()
|
||||
.map(|key| UnsetRegionOption::try_from(key.as_str()))
|
||||
.collect::<std::result::Result<Vec<_>, _>>()
|
||||
.context(InvalidUnsetTableOptionRequestSnafu)?,
|
||||
}
|
||||
}
|
||||
}
|
||||
Kind::SetIndex(o) => {
|
||||
@@ -324,7 +356,7 @@ fn parse_location(location: Option<Location>) -> Result<Option<AddColumnLocation
|
||||
mod tests {
|
||||
use api::v1::{
|
||||
AddColumn, AddColumns, ColumnDataType, ColumnDef, DropColumn, ModifyColumnType,
|
||||
SemanticType,
|
||||
Option as PbOption, SemanticType, SetTableOptions, UnsetTableOptions,
|
||||
};
|
||||
use datatypes::prelude::ConcreteDataType;
|
||||
|
||||
@@ -514,4 +546,72 @@ mod tests {
|
||||
assert_eq!(1, drop_names.len());
|
||||
assert_eq!("mem_usage".to_string(), drop_names.pop().unwrap());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_set_repartition_column_hint_expr() {
|
||||
let expr = AlterTableExpr {
|
||||
catalog_name: "test_catalog".to_string(),
|
||||
schema_name: "test_schema".to_string(),
|
||||
table_name: "monitor".to_string(),
|
||||
kind: Some(Kind::SetTableOptions(SetTableOptions {
|
||||
table_options: vec![PbOption {
|
||||
key: REPARTITION_COLUMN_HINT_KEY.to_string(),
|
||||
value: "host".to_string(),
|
||||
}],
|
||||
})),
|
||||
};
|
||||
|
||||
let alter_request = alter_expr_to_request(1, expr, None).unwrap();
|
||||
match alter_request.alter_kind {
|
||||
AlterKind::SetRepartitionColumnHint { column_name } => {
|
||||
assert_eq!("host", column_name);
|
||||
}
|
||||
_ => unreachable!(),
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_set_repartition_column_hint_rejects_mixed_options() {
|
||||
let expr = AlterTableExpr {
|
||||
catalog_name: "test_catalog".to_string(),
|
||||
schema_name: "test_schema".to_string(),
|
||||
table_name: "monitor".to_string(),
|
||||
kind: Some(Kind::SetTableOptions(SetTableOptions {
|
||||
table_options: vec![
|
||||
PbOption {
|
||||
key: REPARTITION_COLUMN_HINT_KEY.to_string(),
|
||||
value: "host".to_string(),
|
||||
},
|
||||
PbOption {
|
||||
key: table::requests::TTL_KEY.to_string(),
|
||||
value: "7d".to_string(),
|
||||
},
|
||||
],
|
||||
})),
|
||||
};
|
||||
|
||||
let err = alter_expr_to_request(1, expr, None).unwrap_err();
|
||||
assert!(
|
||||
err.to_string()
|
||||
.contains("repartition.column.hint must be altered separately")
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_unset_repartition_column_hint_expr() {
|
||||
let expr = AlterTableExpr {
|
||||
catalog_name: "test_catalog".to_string(),
|
||||
schema_name: "test_schema".to_string(),
|
||||
table_name: "monitor".to_string(),
|
||||
kind: Some(Kind::UnsetTableOptions(UnsetTableOptions {
|
||||
keys: vec![REPARTITION_COLUMN_HINT_KEY.to_string()],
|
||||
})),
|
||||
};
|
||||
|
||||
let alter_request = alter_expr_to_request(1, expr, None).unwrap();
|
||||
assert!(matches!(
|
||||
alter_request.alter_kind,
|
||||
AlterKind::UnsetRepartitionColumnHint
|
||||
));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -111,6 +111,13 @@ pub enum Error {
|
||||
error: MetadataError,
|
||||
},
|
||||
|
||||
#[snafu(display("Invalid table option request: {err_msg}"))]
|
||||
InvalidTableOptionRequest {
|
||||
err_msg: String,
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Invalid set fulltext option request"))]
|
||||
InvalidSetFulltextOptionRequest {
|
||||
#[snafu(implicit)]
|
||||
@@ -188,6 +195,7 @@ impl ErrorExt for Error {
|
||||
}
|
||||
Error::InvalidSetTableOptionRequest { .. }
|
||||
| Error::InvalidUnsetTableOptionRequest { .. }
|
||||
| Error::InvalidTableOptionRequest { .. }
|
||||
| Error::InvalidSetFulltextOptionRequest { .. }
|
||||
| Error::InvalidSetSkippingIndexOptionRequest { .. }
|
||||
| Error::MissingAlterIndexOption { .. }
|
||||
|
||||
@@ -19,8 +19,8 @@ mod region_request;
|
||||
use std::vec;
|
||||
|
||||
use api::region::RegionResponse;
|
||||
use api::v1::RenameTable;
|
||||
use api::v1::alter_table_expr::Kind;
|
||||
use api::v1::{RenameTable, SetTableOptions, UnsetTableOptions};
|
||||
use async_trait::async_trait;
|
||||
use common_error::ext::BoxedError;
|
||||
use common_procedure::error::{FromJsonSnafu, Result as ProcedureResult, ToJsonSnafu};
|
||||
@@ -35,6 +35,7 @@ use store_api::metadata::ColumnMetadata;
|
||||
use store_api::metric_engine_consts::TABLE_COLUMN_METADATA_EXTENSION_KEY;
|
||||
use strum::AsRefStr;
|
||||
use table::metadata::{TableId, TableInfo};
|
||||
use table::requests::REPARTITION_COLUMN_HINT_KEY;
|
||||
use table::table_reference::TableReference;
|
||||
|
||||
use crate::ddl::DdlContext;
|
||||
@@ -126,7 +127,7 @@ impl AlterTableProcedure {
|
||||
|
||||
// Safety: Checked in `AlterTableProcedure::new`.
|
||||
let alter_kind = self.data.task.alter_table.kind.as_ref().unwrap();
|
||||
if matches!(alter_kind, Kind::RenameTable { .. }) {
|
||||
if is_metadata_only_alter(alter_kind) {
|
||||
self.data.state = AlterTableState::UpdateMetadata;
|
||||
} else {
|
||||
self.data.state = AlterTableState::SubmitAlterRegionRequests;
|
||||
@@ -268,6 +269,7 @@ impl AlterTableProcedure {
|
||||
let table_info_value = self.data.table_info_value.as_ref().unwrap();
|
||||
// Safety: Checked in `AlterTableProcedure::new`.
|
||||
let alter_kind = self.data.task.alter_table.kind.as_ref().unwrap();
|
||||
let metadata_only_alter = is_metadata_only_alter(alter_kind);
|
||||
|
||||
// Gets the table info from the cache or builds it.
|
||||
let new_info = match &self.new_table_info {
|
||||
@@ -290,6 +292,7 @@ impl AlterTableProcedure {
|
||||
self.data.region_distribution.as_ref(),
|
||||
new_info,
|
||||
&self.data.column_metadatas,
|
||||
metadata_only_alter,
|
||||
)
|
||||
.await?;
|
||||
|
||||
@@ -338,6 +341,19 @@ impl AlterTableProcedure {
|
||||
}
|
||||
}
|
||||
|
||||
fn is_metadata_only_alter(alter_kind: &Kind) -> bool {
|
||||
match alter_kind {
|
||||
Kind::RenameTable { .. } => true,
|
||||
Kind::SetTableOptions(SetTableOptions { table_options }) => {
|
||||
table_options.len() == 1 && table_options[0].key.as_str() == REPARTITION_COLUMN_HINT_KEY
|
||||
}
|
||||
Kind::UnsetTableOptions(UnsetTableOptions { keys }) => {
|
||||
keys.len() == 1 && keys[0].as_str() == REPARTITION_COLUMN_HINT_KEY
|
||||
}
|
||||
_ => false,
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl Procedure for AlterTableProcedure {
|
||||
fn type_name(&self) -> &str {
|
||||
|
||||
@@ -121,6 +121,7 @@ impl AlterTableExecutor {
|
||||
region_distribution: Option<&RegionDistribution>,
|
||||
mut raw_table_info: TableInfo,
|
||||
column_metadatas: &[ColumnMetadata],
|
||||
metadata_only_alter: bool,
|
||||
) -> Result<()> {
|
||||
let table_ref = self.table.table_ref();
|
||||
let table_id = self.table_id;
|
||||
@@ -141,7 +142,7 @@ impl AlterTableExecutor {
|
||||
);
|
||||
|
||||
ensure!(
|
||||
region_distribution.is_some(),
|
||||
metadata_only_alter || region_distribution.is_some(),
|
||||
UnexpectedSnafu {
|
||||
err_msg: "region distribution is not set when updating table metadata",
|
||||
}
|
||||
@@ -298,6 +299,8 @@ fn build_new_table_info(
|
||||
| AlterKind::ModifyColumnTypes { .. }
|
||||
| AlterKind::SetTableOptions { .. }
|
||||
| AlterKind::UnsetTableOptions { .. }
|
||||
| AlterKind::SetRepartitionColumnHint { .. }
|
||||
| AlterKind::UnsetRepartitionColumnHint
|
||||
| AlterKind::SetIndexes { .. }
|
||||
| AlterKind::UnsetIndexes { .. }
|
||||
| AlterKind::DropDefaults { .. }
|
||||
|
||||
@@ -89,7 +89,8 @@ use table::TableRef;
|
||||
use table::dist_table::DistTable;
|
||||
use table::metadata::{self, TableId, TableInfo, TableMeta, TableType};
|
||||
use table::requests::{
|
||||
AlterKind, AlterTableRequest, COMMENT_KEY, DDL_TIMEOUT, DDL_WAIT, TableOptions,
|
||||
AlterKind, AlterTableRequest, COMMENT_KEY, DDL_TIMEOUT, DDL_WAIT, REPARTITION_COLUMN_HINT_KEY,
|
||||
TableOptions,
|
||||
};
|
||||
use table::table_name::TableName;
|
||||
use table::table_reference::TableReference;
|
||||
@@ -2300,9 +2301,16 @@ pub fn create_table_info(
|
||||
})
|
||||
.collect::<Result<Vec<_>>>()?;
|
||||
|
||||
let table_options = TableOptions::try_from_iter(&create_table.table_options)
|
||||
let mut table_options = TableOptions::try_from_iter(&create_table.table_options)
|
||||
.context(UnrecognizedTableOptionSnafu)?;
|
||||
|
||||
validate_repartition_column_hint(
|
||||
&mut table_options,
|
||||
&column_name_to_index_map,
|
||||
&partition_key_indices,
|
||||
&create_table.time_index,
|
||||
)?;
|
||||
|
||||
let meta = TableMeta {
|
||||
schema,
|
||||
primary_key_indices,
|
||||
@@ -2338,6 +2346,61 @@ pub fn create_table_info(
|
||||
Ok(table_info)
|
||||
}
|
||||
|
||||
fn validate_repartition_column_hint(
|
||||
table_options: &mut TableOptions,
|
||||
column_name_to_index_map: &HashMap<String, usize>,
|
||||
partition_key_indices: &[usize],
|
||||
time_index: &str,
|
||||
) -> Result<()> {
|
||||
let Some(column_name) = table_options
|
||||
.extra_options
|
||||
.get(REPARTITION_COLUMN_HINT_KEY)
|
||||
.map(|value| value.trim().to_string())
|
||||
else {
|
||||
return Ok(());
|
||||
};
|
||||
|
||||
ensure!(
|
||||
!column_name.is_empty(),
|
||||
InvalidPartitionRuleSnafu {
|
||||
reason: format!("{REPARTITION_COLUMN_HINT_KEY} expects exactly one column name"),
|
||||
}
|
||||
);
|
||||
|
||||
ensure!(
|
||||
!column_name.contains(','),
|
||||
InvalidPartitionRuleSnafu {
|
||||
reason: format!("{REPARTITION_COLUMN_HINT_KEY} expects exactly one column name"),
|
||||
}
|
||||
);
|
||||
|
||||
ensure!(
|
||||
partition_key_indices.is_empty(),
|
||||
InvalidPartitionRuleSnafu {
|
||||
reason: format!(
|
||||
"cannot set {REPARTITION_COLUMN_HINT_KEY} on a table with partition metadata"
|
||||
),
|
||||
}
|
||||
);
|
||||
|
||||
column_name_to_index_map
|
||||
.get(&column_name)
|
||||
.context(ColumnNotFoundSnafu { msg: &column_name })?;
|
||||
|
||||
ensure!(
|
||||
column_name != time_index,
|
||||
InvalidPartitionRuleSnafu {
|
||||
reason: format!("cannot set {REPARTITION_COLUMN_HINT_KEY} to the time index column"),
|
||||
}
|
||||
);
|
||||
|
||||
table_options
|
||||
.extra_options
|
||||
.insert(REPARTITION_COLUMN_HINT_KEY.to_string(), column_name);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn find_partition_columns(partitions: &Option<Partitions>) -> Result<Vec<String>> {
|
||||
let columns = if let Some(partitions) = partitions {
|
||||
partitions
|
||||
@@ -2821,6 +2884,144 @@ SELECT max(c1), min(c2) FROM schema_2.table_2;";
|
||||
assert!(err.to_string().contains("device_id"));
|
||||
}
|
||||
|
||||
fn create_expr_from_sql(sql: &str) -> CreateTableExpr {
|
||||
let result =
|
||||
ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
|
||||
.unwrap();
|
||||
|
||||
match &result[0] {
|
||||
Statement::CreateTable(create) => {
|
||||
expr_helper::create_to_expr(create, &QueryContext::arc()).unwrap()
|
||||
}
|
||||
_ => unreachable!(),
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_create_table_with_repartition_column_hint() {
|
||||
let expr = create_expr_from_sql(
|
||||
r"
|
||||
CREATE TABLE metrics (
|
||||
host STRING,
|
||||
ts TIMESTAMP TIME INDEX,
|
||||
cpu DOUBLE,
|
||||
PRIMARY KEY(host)
|
||||
)
|
||||
WITH ('repartition.column.hint' = ' host ')",
|
||||
);
|
||||
|
||||
let table_info = create_table_info(&expr, vec![]).unwrap();
|
||||
assert_eq!(
|
||||
table_info
|
||||
.meta
|
||||
.options
|
||||
.extra_options
|
||||
.get(REPARTITION_COLUMN_HINT_KEY),
|
||||
Some(&"host".to_string())
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_create_table_with_empty_repartition_column_hint() {
|
||||
let expr = create_expr_from_sql(
|
||||
r"
|
||||
CREATE TABLE metrics (
|
||||
host STRING,
|
||||
ts TIMESTAMP TIME INDEX,
|
||||
cpu DOUBLE,
|
||||
PRIMARY KEY(host)
|
||||
)
|
||||
WITH ('repartition.column.hint' = '')",
|
||||
);
|
||||
|
||||
let err = create_table_info(&expr, vec![]).unwrap_err();
|
||||
assert!(
|
||||
err.to_string()
|
||||
.contains("repartition.column.hint expects exactly one column name")
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_create_table_with_multiple_repartition_column_hints() {
|
||||
let expr = create_expr_from_sql(
|
||||
r"
|
||||
CREATE TABLE metrics (
|
||||
host STRING,
|
||||
region_id STRING,
|
||||
ts TIMESTAMP TIME INDEX,
|
||||
cpu DOUBLE,
|
||||
PRIMARY KEY(host)
|
||||
)
|
||||
WITH ('repartition.column.hint' = 'host,region_id')",
|
||||
);
|
||||
|
||||
let err = create_table_info(&expr, vec![]).unwrap_err();
|
||||
assert!(
|
||||
err.to_string()
|
||||
.contains("repartition.column.hint expects exactly one column name")
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_create_table_with_missing_repartition_column_hint() {
|
||||
let expr = create_expr_from_sql(
|
||||
r"
|
||||
CREATE TABLE metrics (
|
||||
host STRING,
|
||||
ts TIMESTAMP TIME INDEX,
|
||||
cpu DOUBLE,
|
||||
PRIMARY KEY(host)
|
||||
)
|
||||
WITH ('repartition.column.hint' = 'region_id')",
|
||||
);
|
||||
|
||||
let err = create_table_info(&expr, vec![]).unwrap_err();
|
||||
assert!(
|
||||
err.to_string()
|
||||
.contains("Cannot find column by name: region")
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_create_table_with_time_index_repartition_column_hint() {
|
||||
let expr = create_expr_from_sql(
|
||||
r"
|
||||
CREATE TABLE metrics (
|
||||
host STRING,
|
||||
ts TIMESTAMP TIME INDEX,
|
||||
cpu DOUBLE,
|
||||
PRIMARY KEY(host)
|
||||
)
|
||||
WITH ('repartition.column.hint' = 'ts')",
|
||||
);
|
||||
|
||||
let err = create_table_info(&expr, vec![]).unwrap_err();
|
||||
assert!(
|
||||
err.to_string()
|
||||
.contains("cannot set repartition.column.hint to the time index column")
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_create_partitioned_table_with_repartition_column_hint() {
|
||||
let expr = create_expr_from_sql(
|
||||
r"
|
||||
CREATE TABLE metrics (
|
||||
host STRING,
|
||||
ts TIMESTAMP TIME INDEX,
|
||||
cpu DOUBLE,
|
||||
PRIMARY KEY(host)
|
||||
)
|
||||
WITH ('repartition.column.hint' = 'host')",
|
||||
);
|
||||
|
||||
let err = create_table_info(&expr, vec!["host".to_string()]).unwrap_err();
|
||||
assert!(
|
||||
err.to_string()
|
||||
.contains("cannot set repartition.column.hint on a table with partition metadata")
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
#[ignore = "TODO(ruihang): WIP new partition rule"]
|
||||
async fn test_parse_partitions() {
|
||||
|
||||
@@ -36,8 +36,8 @@ use store_api::storage::{ColumnDescriptor, ColumnDescriptorBuilder, ColumnId};
|
||||
|
||||
use crate::error::{self, Result};
|
||||
use crate::requests::{
|
||||
AddColumnRequest, AlterKind, ModifyColumnTypeRequest, SetDefaultRequest, SetIndexOption,
|
||||
TableOptions, UnsetIndexOption,
|
||||
AddColumnRequest, AlterKind, ModifyColumnTypeRequest, REPARTITION_COLUMN_HINT_KEY,
|
||||
SetDefaultRequest, SetIndexOption, TableOptions, UnsetIndexOption,
|
||||
};
|
||||
use crate::table_reference::TableReference;
|
||||
|
||||
@@ -330,6 +330,10 @@ impl TableMeta {
|
||||
AlterKind::RenameTable { .. } => Ok(self.new_meta_builder()),
|
||||
AlterKind::SetTableOptions { options } => self.set_table_options(options),
|
||||
AlterKind::UnsetTableOptions { keys } => self.unset_table_options(keys),
|
||||
AlterKind::SetRepartitionColumnHint { column_name } => {
|
||||
self.set_repartition_column_hint(table_name, column_name)
|
||||
}
|
||||
AlterKind::UnsetRepartitionColumnHint => self.unset_repartition_column_hint(),
|
||||
AlterKind::SetIndexes { options } => self.set_indexes(table_name, options),
|
||||
AlterKind::UnsetIndexes { options } => self.unset_indexes(table_name, options),
|
||||
AlterKind::DropDefaults { names } => self.drop_defaults(table_name, names),
|
||||
@@ -387,6 +391,72 @@ impl TableMeta {
|
||||
self.set_table_options(&requests)
|
||||
}
|
||||
|
||||
fn set_repartition_column_hint(
|
||||
&self,
|
||||
table_name: &str,
|
||||
column_name: &str,
|
||||
) -> Result<TableMetaBuilder> {
|
||||
let column_name = column_name.trim();
|
||||
ensure!(
|
||||
!column_name.is_empty() && !column_name.contains(','),
|
||||
error::InvalidAlterRequestSnafu {
|
||||
table: table_name,
|
||||
err: format!("{REPARTITION_COLUMN_HINT_KEY} expects exactly one column name"),
|
||||
}
|
||||
);
|
||||
|
||||
ensure!(
|
||||
self.partition_key_indices.is_empty(),
|
||||
error::InvalidAlterRequestSnafu {
|
||||
table: table_name,
|
||||
err: format!(
|
||||
"cannot set {REPARTITION_COLUMN_HINT_KEY} on a table with partition metadata"
|
||||
),
|
||||
}
|
||||
);
|
||||
|
||||
let column_index = self
|
||||
.schema
|
||||
.column_index_by_name(column_name)
|
||||
.with_context(|| error::ColumnNotExistsSnafu {
|
||||
column_name,
|
||||
table_name,
|
||||
})?;
|
||||
|
||||
if let Some(time_index) = self.schema.timestamp_index() {
|
||||
ensure!(
|
||||
column_index != time_index,
|
||||
error::InvalidAlterRequestSnafu {
|
||||
table: table_name,
|
||||
err: format!(
|
||||
"cannot set {REPARTITION_COLUMN_HINT_KEY} to the time index column"
|
||||
),
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
let mut new_options = self.options.clone();
|
||||
new_options.extra_options.insert(
|
||||
REPARTITION_COLUMN_HINT_KEY.to_string(),
|
||||
column_name.to_string(),
|
||||
);
|
||||
|
||||
let mut builder = self.new_meta_builder();
|
||||
builder.options(new_options);
|
||||
Ok(builder)
|
||||
}
|
||||
|
||||
fn unset_repartition_column_hint(&self) -> Result<TableMetaBuilder> {
|
||||
let mut new_options = self.options.clone();
|
||||
new_options
|
||||
.extra_options
|
||||
.remove(REPARTITION_COLUMN_HINT_KEY);
|
||||
|
||||
let mut builder = self.new_meta_builder();
|
||||
builder.options(new_options);
|
||||
Ok(builder)
|
||||
}
|
||||
|
||||
fn set_indexes(
|
||||
&self,
|
||||
table_name: &str,
|
||||
@@ -1292,7 +1362,9 @@ impl TableInfo {
|
||||
///
|
||||
/// All "region options" are actually a copy of table options for redundancy.
|
||||
pub fn to_region_options(&self) -> HashMap<String, String> {
|
||||
HashMap::from(&self.meta.options)
|
||||
let mut options = HashMap::from(&self.meta.options);
|
||||
options.remove(REPARTITION_COLUMN_HINT_KEY);
|
||||
options
|
||||
}
|
||||
|
||||
/// Returns the table reference.
|
||||
@@ -1592,6 +1664,214 @@ mod tests {
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_set_repartition_column_hint() {
|
||||
let meta = TableMetaBuilder::empty()
|
||||
.schema(Arc::new(new_test_schema()))
|
||||
.primary_key_indices(vec![0])
|
||||
.engine("engine")
|
||||
.next_column_id(3)
|
||||
.build()
|
||||
.unwrap();
|
||||
|
||||
let alter_kind = AlterKind::SetRepartitionColumnHint {
|
||||
column_name: " col1 ".to_string(),
|
||||
};
|
||||
let new_meta = meta
|
||||
.builder_with_alter_kind("my_table", &alter_kind)
|
||||
.unwrap()
|
||||
.build()
|
||||
.unwrap();
|
||||
|
||||
assert_eq!(
|
||||
Some("col1"),
|
||||
new_meta
|
||||
.options
|
||||
.extra_options
|
||||
.get(REPARTITION_COLUMN_HINT_KEY)
|
||||
.map(String::as_str)
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_set_repartition_column_hint_rejects_empty_column() {
|
||||
let meta = TableMetaBuilder::empty()
|
||||
.schema(Arc::new(new_test_schema()))
|
||||
.primary_key_indices(vec![0])
|
||||
.engine("engine")
|
||||
.next_column_id(3)
|
||||
.build()
|
||||
.unwrap();
|
||||
|
||||
let alter_kind = AlterKind::SetRepartitionColumnHint {
|
||||
column_name: " ".to_string(),
|
||||
};
|
||||
let err = meta
|
||||
.builder_with_alter_kind("my_table", &alter_kind)
|
||||
.err()
|
||||
.unwrap();
|
||||
|
||||
assert!(
|
||||
err.to_string()
|
||||
.contains("repartition.column.hint expects exactly one column name")
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_set_repartition_column_hint_rejects_multiple_columns() {
|
||||
let meta = TableMetaBuilder::empty()
|
||||
.schema(Arc::new(new_test_schema()))
|
||||
.primary_key_indices(vec![0])
|
||||
.engine("engine")
|
||||
.next_column_id(3)
|
||||
.build()
|
||||
.unwrap();
|
||||
|
||||
let alter_kind = AlterKind::SetRepartitionColumnHint {
|
||||
column_name: "col1,col2".to_string(),
|
||||
};
|
||||
let err = meta
|
||||
.builder_with_alter_kind("my_table", &alter_kind)
|
||||
.err()
|
||||
.unwrap();
|
||||
|
||||
assert!(
|
||||
err.to_string()
|
||||
.contains("repartition.column.hint expects exactly one column name")
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_set_repartition_column_hint_rejects_missing_column() {
|
||||
let meta = TableMetaBuilder::empty()
|
||||
.schema(Arc::new(new_test_schema()))
|
||||
.primary_key_indices(vec![0])
|
||||
.engine("engine")
|
||||
.next_column_id(3)
|
||||
.build()
|
||||
.unwrap();
|
||||
|
||||
let alter_kind = AlterKind::SetRepartitionColumnHint {
|
||||
column_name: "missing".to_string(),
|
||||
};
|
||||
let err = meta
|
||||
.builder_with_alter_kind("my_table", &alter_kind)
|
||||
.err()
|
||||
.unwrap();
|
||||
|
||||
assert!(err.to_string().contains("Column missing not exists"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_set_repartition_column_hint_rejects_time_index_column() {
|
||||
let meta = TableMetaBuilder::empty()
|
||||
.schema(Arc::new(new_test_schema()))
|
||||
.primary_key_indices(vec![0])
|
||||
.engine("engine")
|
||||
.next_column_id(3)
|
||||
.build()
|
||||
.unwrap();
|
||||
|
||||
let alter_kind = AlterKind::SetRepartitionColumnHint {
|
||||
column_name: "ts".to_string(),
|
||||
};
|
||||
let err = meta
|
||||
.builder_with_alter_kind("my_table", &alter_kind)
|
||||
.err()
|
||||
.unwrap();
|
||||
|
||||
assert!(
|
||||
err.to_string()
|
||||
.contains("cannot set repartition.column.hint to the time index column")
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_set_repartition_column_hint_rejects_partitioned_table() {
|
||||
let meta = TableMetaBuilder::empty()
|
||||
.schema(Arc::new(new_test_schema()))
|
||||
.primary_key_indices(vec![0])
|
||||
.engine("engine")
|
||||
.next_column_id(3)
|
||||
.partition_key_indices(vec![0])
|
||||
.build()
|
||||
.unwrap();
|
||||
|
||||
let alter_kind = AlterKind::SetRepartitionColumnHint {
|
||||
column_name: "col1".to_string(),
|
||||
};
|
||||
let err = meta
|
||||
.builder_with_alter_kind("my_table", &alter_kind)
|
||||
.err()
|
||||
.unwrap();
|
||||
|
||||
assert!(
|
||||
err.to_string()
|
||||
.contains("cannot set repartition.column.hint on a table with partition metadata")
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_unset_repartition_column_hint() {
|
||||
let mut table_options = TableOptions::default();
|
||||
table_options
|
||||
.extra_options
|
||||
.insert(REPARTITION_COLUMN_HINT_KEY.to_string(), "col1".to_string());
|
||||
let meta = TableMetaBuilder::empty()
|
||||
.schema(Arc::new(new_test_schema()))
|
||||
.primary_key_indices(vec![0])
|
||||
.engine("engine")
|
||||
.next_column_id(3)
|
||||
.options(table_options)
|
||||
.build()
|
||||
.unwrap();
|
||||
|
||||
let new_meta = meta
|
||||
.builder_with_alter_kind("my_table", &AlterKind::UnsetRepartitionColumnHint)
|
||||
.unwrap()
|
||||
.build()
|
||||
.unwrap();
|
||||
|
||||
assert!(
|
||||
!new_meta
|
||||
.options
|
||||
.extra_options
|
||||
.contains_key(REPARTITION_COLUMN_HINT_KEY)
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_repartition_column_hint_is_not_region_option() {
|
||||
let mut table_options = TableOptions::default();
|
||||
table_options
|
||||
.extra_options
|
||||
.insert(REPARTITION_COLUMN_HINT_KEY.to_string(), "col1".to_string());
|
||||
let table_info = TableInfoBuilder::default()
|
||||
.table_id(1)
|
||||
.table_version(0)
|
||||
.name("my_table")
|
||||
.catalog_name(DEFAULT_CATALOG_NAME)
|
||||
.schema_name(DEFAULT_SCHEMA_NAME)
|
||||
.meta(
|
||||
TableMetaBuilder::empty()
|
||||
.schema(Arc::new(new_test_schema()))
|
||||
.primary_key_indices(vec![0])
|
||||
.engine("engine")
|
||||
.next_column_id(3)
|
||||
.options(table_options)
|
||||
.build()
|
||||
.unwrap(),
|
||||
)
|
||||
.build()
|
||||
.unwrap();
|
||||
|
||||
assert!(
|
||||
!table_info
|
||||
.to_region_options()
|
||||
.contains_key(REPARTITION_COLUMN_HINT_KEY)
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_add_columns_multiple_times() {
|
||||
let schema = Arc::new(new_test_schema());
|
||||
|
||||
@@ -62,7 +62,7 @@ pub const TABLE_DATA_MODEL_TRACE_V1: &str = "greptime_trace_v1";
|
||||
pub const OTLP_METRIC_COMPAT_KEY: &str = "otlp_metric_compat";
|
||||
pub const OTLP_METRIC_COMPAT_PROM: &str = "prom";
|
||||
|
||||
pub const VALID_TABLE_OPTION_KEYS: [&str; 13] = [
|
||||
pub const VALID_TABLE_OPTION_KEYS: [&str; 14] = [
|
||||
// common keys:
|
||||
WRITE_BUFFER_SIZE_KEY,
|
||||
TTL_KEY,
|
||||
@@ -80,6 +80,7 @@ pub const VALID_TABLE_OPTION_KEYS: [&str; 13] = [
|
||||
// table model info
|
||||
TABLE_DATA_MODEL,
|
||||
OTLP_METRIC_COMPAT_KEY,
|
||||
REPARTITION_COLUMN_HINT_KEY,
|
||||
];
|
||||
|
||||
pub const DDL_TIMEOUT: &str = "timeout";
|
||||
@@ -161,6 +162,7 @@ pub const COMMENT_KEY: &str = "comment";
|
||||
pub const AUTO_CREATE_TABLE_KEY: &str = "auto_create_table";
|
||||
pub const SKIP_WAL_KEY: &str = store_api::mito_engine_options::SKIP_WAL_KEY;
|
||||
pub const TRACE_TABLE_PARTITIONS_HINT_KEY: &str = "trace_table_partitions";
|
||||
pub const REPARTITION_COLUMN_HINT_KEY: &str = "repartition.column.hint";
|
||||
|
||||
impl TableOptions {
|
||||
pub fn try_from_iter<T: ToString, U: IntoIterator<Item = (T, T)>>(
|
||||
@@ -307,6 +309,10 @@ pub enum AlterKind {
|
||||
UnsetTableOptions {
|
||||
keys: Vec<UnsetRegionOption>,
|
||||
},
|
||||
SetRepartitionColumnHint {
|
||||
column_name: String,
|
||||
},
|
||||
UnsetRepartitionColumnHint,
|
||||
SetIndexes {
|
||||
options: Vec<SetIndexOption>,
|
||||
},
|
||||
@@ -498,6 +504,7 @@ mod tests {
|
||||
assert!(validate_table_option(WRITE_BUFFER_SIZE_KEY));
|
||||
assert!(validate_table_option(STORAGE_KEY));
|
||||
assert!(validate_table_option(MEMTABLE_BULK_MERGE_THRESHOLD));
|
||||
assert!(validate_table_option(REPARTITION_COLUMN_HINT_KEY));
|
||||
assert!(!validate_table_option("foo"));
|
||||
|
||||
// Only whitelisted semantic keys are accepted.
|
||||
|
||||
@@ -90,6 +90,96 @@ drop table table_without_partition;
|
||||
|
||||
Affected Rows: 0
|
||||
|
||||
CREATE TABLE hint_table (
|
||||
host STRING,
|
||||
ts TIMESTAMP TIME INDEX,
|
||||
cpu DOUBLE,
|
||||
PRIMARY KEY(host)
|
||||
)
|
||||
WITH ('repartition.column.hint' = 'host');
|
||||
|
||||
Affected Rows: 0
|
||||
|
||||
SHOW CREATE TABLE hint_table;
|
||||
|
||||
+------------+-------------------------------------------+
|
||||
| Table | Create Table |
|
||||
+------------+-------------------------------------------+
|
||||
| hint_table | CREATE TABLE IF NOT EXISTS "hint_table" ( |
|
||||
| | "host" STRING NULL, |
|
||||
| | "ts" TIMESTAMP(3) NOT NULL, |
|
||||
| | "cpu" DOUBLE NULL, |
|
||||
| | TIME INDEX ("ts"), |
|
||||
| | PRIMARY KEY ("host") |
|
||||
| | ) |
|
||||
| | |
|
||||
| | ENGINE=mito |
|
||||
| | WITH( |
|
||||
| | 'repartition.column.hint' = 'host' |
|
||||
| | ) |
|
||||
+------------+-------------------------------------------+
|
||||
|
||||
DROP TABLE hint_table;
|
||||
|
||||
Affected Rows: 0
|
||||
|
||||
CREATE TABLE alter_hint_table (
|
||||
host STRING,
|
||||
ts TIMESTAMP TIME INDEX,
|
||||
cpu DOUBLE,
|
||||
PRIMARY KEY(host)
|
||||
);
|
||||
|
||||
Affected Rows: 0
|
||||
|
||||
ALTER TABLE alter_hint_table SET 'repartition.column.hint' = 'host';
|
||||
|
||||
Affected Rows: 0
|
||||
|
||||
SHOW CREATE TABLE alter_hint_table;
|
||||
|
||||
+------------------+-------------------------------------------------+
|
||||
| Table | Create Table |
|
||||
+------------------+-------------------------------------------------+
|
||||
| alter_hint_table | CREATE TABLE IF NOT EXISTS "alter_hint_table" ( |
|
||||
| | "host" STRING NULL, |
|
||||
| | "ts" TIMESTAMP(3) NOT NULL, |
|
||||
| | "cpu" DOUBLE NULL, |
|
||||
| | TIME INDEX ("ts"), |
|
||||
| | PRIMARY KEY ("host") |
|
||||
| | ) |
|
||||
| | |
|
||||
| | ENGINE=mito |
|
||||
| | WITH( |
|
||||
| | 'repartition.column.hint' = 'host' |
|
||||
| | ) |
|
||||
+------------------+-------------------------------------------------+
|
||||
|
||||
ALTER TABLE alter_hint_table UNSET 'repartition.column.hint';
|
||||
|
||||
Affected Rows: 0
|
||||
|
||||
SHOW CREATE TABLE alter_hint_table;
|
||||
|
||||
+------------------+-------------------------------------------------+
|
||||
| Table | Create Table |
|
||||
+------------------+-------------------------------------------------+
|
||||
| alter_hint_table | CREATE TABLE IF NOT EXISTS "alter_hint_table" ( |
|
||||
| | "host" STRING NULL, |
|
||||
| | "ts" TIMESTAMP(3) NOT NULL, |
|
||||
| | "cpu" DOUBLE NULL, |
|
||||
| | TIME INDEX ("ts"), |
|
||||
| | PRIMARY KEY ("host") |
|
||||
| | ) |
|
||||
| | |
|
||||
| | ENGINE=mito |
|
||||
| | |
|
||||
+------------------+-------------------------------------------------+
|
||||
|
||||
DROP TABLE alter_hint_table;
|
||||
|
||||
Affected Rows: 0
|
||||
|
||||
CREATE TABLE not_supported_table_storage_option (
|
||||
`id` INT UNSIGNED,
|
||||
host STRING,
|
||||
|
||||
@@ -32,6 +32,35 @@ show create table table_without_partition;
|
||||
|
||||
drop table table_without_partition;
|
||||
|
||||
CREATE TABLE hint_table (
|
||||
host STRING,
|
||||
ts TIMESTAMP TIME INDEX,
|
||||
cpu DOUBLE,
|
||||
PRIMARY KEY(host)
|
||||
)
|
||||
WITH ('repartition.column.hint' = 'host');
|
||||
|
||||
SHOW CREATE TABLE hint_table;
|
||||
|
||||
DROP TABLE hint_table;
|
||||
|
||||
CREATE TABLE alter_hint_table (
|
||||
host STRING,
|
||||
ts TIMESTAMP TIME INDEX,
|
||||
cpu DOUBLE,
|
||||
PRIMARY KEY(host)
|
||||
);
|
||||
|
||||
ALTER TABLE alter_hint_table SET 'repartition.column.hint' = 'host';
|
||||
|
||||
SHOW CREATE TABLE alter_hint_table;
|
||||
|
||||
ALTER TABLE alter_hint_table UNSET 'repartition.column.hint';
|
||||
|
||||
SHOW CREATE TABLE alter_hint_table;
|
||||
|
||||
DROP TABLE alter_hint_table;
|
||||
|
||||
CREATE TABLE not_supported_table_storage_option (
|
||||
`id` INT UNSIGNED,
|
||||
host STRING,
|
||||
|
||||
Reference in New Issue
Block a user