mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-01-04 04:12:55 +00:00
feat: Support altering table TTL (#4848)
* feat/alter-ttl: Update greptime-proto source and add ChangeTableOptions handling - Change greptime-proto source repository and revision in Cargo.lock and Cargo.toml - Implement handling for ChangeTableOptions in grpc-expr and meta modules - Add support for parsing and applying region option changes in mito2 - Introduce new error type for invalid change table option requests - Add humantime dependency to store-api - Fix SQL syntax in tests for changing column types * chore: remove write buffer size option handling since we don't support specifying write_buffer_size for single table or region * persist ttl to manifest * chore: add sqlness * fix: sqlness * fix: typo and toml format * fix: tests * update: change alter syntax * feat/alter-ttl: Add Clone trait to RegionFlushRequest and remove redundant Default derive in region_request.rs. * feat/alter-ttl: Refactor code to replace 'ChangeTableOption' with 'ChangeRegionOption' and handle TTL as a region option • Rename ChangeTableOption to ChangeRegionOption across various files. • Update AlterKind::ChangeTableOptions to AlterKind::ChangeRegionOptions. • Modify TTL handling to treat '0d' as None for TTL in table options. • Adjust related function names and comments to reflect the change from table to region options. • Include test case updates to verify the new TTL handling behavior. * chore: update format * refactor: update region options in DatanodeTableValue * feat/alter-ttl: Remove TTL handling from RegionManifest and related structures - Eliminate TTL fields from `RegionManifest`, `RegionChange`, and associated handling logic. - Update tests and checksums to reflect removal of TTL. - Refactor `RegionOpener` and `handle_alter` to adjust to TTL removal. - Simplify `RegionChangeResult` by replacing `change` with `new_meta`. * chore: fmt * remove useless delete op * feat/alter-ttl: Updated Cargo.lock and gRPC expression Cargo.toml to include store-api dependency. Refactored alter.rs to use ChangeOption from store-api instead of ChangeTableOptionRequest. Adjusted error handling in error.rs to use MetadataError. Modified handle_alter.rs to handle TTL changes with ChangeOption. Simplified region_request.rs by replacing ChangeRegionOption with ChangeOption and removing redundant code. Removed UnsupportedTableOptionChange error in table/src/error.rs. Updated metadata.rs to use ChangeOption for table options. Removed ChangeTableOptionRequest enum and related conversion code from requests.rs. * feat/alter-ttl: Update greptime-proto dependency to revision 53ab9a9553 * chore: format code * chore: update greptime-proto
This commit is contained in:
4
Cargo.lock
generated
4
Cargo.lock
generated
@@ -2149,6 +2149,7 @@ dependencies = [
|
||||
"paste",
|
||||
"prost 0.12.6",
|
||||
"snafu 0.8.5",
|
||||
"store-api",
|
||||
"table",
|
||||
]
|
||||
|
||||
@@ -4531,7 +4532,7 @@ dependencies = [
|
||||
[[package]]
|
||||
name = "greptime-proto"
|
||||
version = "0.1.0"
|
||||
source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=9faf45e8bd83cba106ddfb09bba85784bf9ade2a#9faf45e8bd83cba106ddfb09bba85784bf9ade2a"
|
||||
source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=255f87a3318ace3f88a67f76995a0e14910983f4#255f87a3318ace3f88a67f76995a0e14910983f4"
|
||||
dependencies = [
|
||||
"prost 0.12.6",
|
||||
"serde",
|
||||
@@ -11497,6 +11498,7 @@ dependencies = [
|
||||
"datatypes",
|
||||
"derive_builder 0.12.0",
|
||||
"futures",
|
||||
"humantime",
|
||||
"serde",
|
||||
"serde_json",
|
||||
"snafu 0.8.5",
|
||||
|
||||
@@ -121,7 +121,7 @@ etcd-client = { version = "0.13" }
|
||||
fst = "0.4.7"
|
||||
futures = "0.3"
|
||||
futures-util = "0.3"
|
||||
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "9faf45e8bd83cba106ddfb09bba85784bf9ade2a" }
|
||||
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "255f87a3318ace3f88a67f76995a0e14910983f4" }
|
||||
humantime = "2.1"
|
||||
humantime-serde = "1.1"
|
||||
itertools = "0.10"
|
||||
|
||||
@@ -18,6 +18,7 @@ common-time.workspace = true
|
||||
datatypes.workspace = true
|
||||
prost.workspace = true
|
||||
snafu.workspace = true
|
||||
store-api.workspace = true
|
||||
table.workspace = true
|
||||
|
||||
[dev-dependencies]
|
||||
|
||||
@@ -22,12 +22,13 @@ use api::v1::{
|
||||
use common_query::AddColumnLocation;
|
||||
use datatypes::schema::{ColumnSchema, RawSchema};
|
||||
use snafu::{ensure, OptionExt, ResultExt};
|
||||
use store_api::region_request::ChangeOption;
|
||||
use table::metadata::TableId;
|
||||
use table::requests::{AddColumnRequest, AlterKind, AlterTableRequest, ChangeColumnTypeRequest};
|
||||
|
||||
use crate::error::{
|
||||
InvalidColumnDefSnafu, MissingFieldSnafu, MissingTimestampColumnSnafu, Result,
|
||||
UnknownLocationTypeSnafu,
|
||||
InvalidChangeTableOptionRequestSnafu, InvalidColumnDefSnafu, MissingFieldSnafu,
|
||||
MissingTimestampColumnSnafu, Result, UnknownLocationTypeSnafu,
|
||||
};
|
||||
|
||||
const LOCATION_TYPE_FIRST: i32 = LocationType::First as i32;
|
||||
@@ -92,6 +93,15 @@ pub fn alter_expr_to_request(table_id: TableId, expr: AlterExpr) -> Result<Alter
|
||||
Kind::RenameTable(RenameTable { new_table_name }) => {
|
||||
AlterKind::RenameTable { new_table_name }
|
||||
}
|
||||
Kind::ChangeTableOptions(api::v1::ChangeTableOptions {
|
||||
change_table_options,
|
||||
}) => AlterKind::ChangeTableOptions {
|
||||
options: change_table_options
|
||||
.iter()
|
||||
.map(ChangeOption::try_from)
|
||||
.collect::<std::result::Result<Vec<_>, _>>()
|
||||
.context(InvalidChangeTableOptionRequestSnafu)?,
|
||||
},
|
||||
};
|
||||
|
||||
let request = AlterTableRequest {
|
||||
|
||||
@@ -19,6 +19,7 @@ use common_error::ext::ErrorExt;
|
||||
use common_error::status_code::StatusCode;
|
||||
use common_macro::stack_trace_debug;
|
||||
use snafu::{Location, Snafu};
|
||||
use store_api::metadata::MetadataError;
|
||||
|
||||
#[derive(Snafu)]
|
||||
#[snafu(visibility(pub))]
|
||||
@@ -118,6 +119,12 @@ pub enum Error {
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Invalid change table option request"))]
|
||||
InvalidChangeTableOptionRequest {
|
||||
#[snafu(source)]
|
||||
error: MetadataError,
|
||||
},
|
||||
}
|
||||
|
||||
pub type Result<T> = std::result::Result<T, Error>;
|
||||
@@ -141,6 +148,7 @@ impl ErrorExt for Error {
|
||||
Error::UnknownColumnDataType { .. } | Error::InvalidFulltextColumnType { .. } => {
|
||||
StatusCode::InvalidArguments
|
||||
}
|
||||
Error::InvalidChangeTableOptionRequest { .. } => StatusCode::InvalidArguments,
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -43,10 +43,10 @@ impl AlterLogicalTablesProcedure {
|
||||
&self.data.physical_columns,
|
||||
);
|
||||
|
||||
// Updates physical table's metadata
|
||||
// Updates physical table's metadata, and we don't need to touch per-region settings.
|
||||
self.context
|
||||
.table_metadata_manager
|
||||
.update_table_info(physical_table_info, new_raw_table_info)
|
||||
.update_table_info(physical_table_info, None, new_raw_table_info)
|
||||
.await?;
|
||||
|
||||
Ok(())
|
||||
|
||||
@@ -43,10 +43,10 @@ use crate::ddl::DdlContext;
|
||||
use crate::error::{Error, Result};
|
||||
use crate::instruction::CacheIdent;
|
||||
use crate::key::table_info::TableInfoValue;
|
||||
use crate::key::DeserializedValueWithBytes;
|
||||
use crate::key::{DeserializedValueWithBytes, RegionDistribution};
|
||||
use crate::lock_key::{CatalogLock, SchemaLock, TableLock, TableNameLock};
|
||||
use crate::rpc::ddl::AlterTableTask;
|
||||
use crate::rpc::router::{find_leader_regions, find_leaders};
|
||||
use crate::rpc::router::{find_leader_regions, find_leaders, region_distribution};
|
||||
use crate::{metrics, ClusterId};
|
||||
|
||||
/// The alter table procedure
|
||||
@@ -101,6 +101,9 @@ impl AlterTableProcedure {
|
||||
.get_physical_table_route(table_id)
|
||||
.await?;
|
||||
|
||||
self.data.region_distribution =
|
||||
Some(region_distribution(&physical_table_route.region_routes));
|
||||
|
||||
let leaders = find_leaders(&physical_table_route.region_routes);
|
||||
let mut alter_region_tasks = Vec::with_capacity(leaders.len());
|
||||
|
||||
@@ -161,8 +164,14 @@ impl AlterTableProcedure {
|
||||
self.on_update_metadata_for_rename(new_table_name.to_string(), table_info_value)
|
||||
.await?;
|
||||
} else {
|
||||
self.on_update_metadata_for_alter(new_info.into(), table_info_value)
|
||||
.await?;
|
||||
// region distribution is set in submit_alter_region_requests
|
||||
let region_distribution = self.data.region_distribution.as_ref().unwrap().clone();
|
||||
self.on_update_metadata_for_alter(
|
||||
new_info.into(),
|
||||
region_distribution,
|
||||
table_info_value,
|
||||
)
|
||||
.await?;
|
||||
}
|
||||
|
||||
info!("Updated table metadata for table {table_ref}, table_id: {table_id}");
|
||||
@@ -271,6 +280,8 @@ pub struct AlterTableData {
|
||||
table_id: TableId,
|
||||
/// Table info value before alteration.
|
||||
table_info_value: Option<DeserializedValueWithBytes<TableInfoValue>>,
|
||||
/// Region distribution for table in case we need to update region options.
|
||||
region_distribution: Option<RegionDistribution>,
|
||||
}
|
||||
|
||||
impl AlterTableData {
|
||||
@@ -281,6 +292,7 @@ impl AlterTableData {
|
||||
table_id,
|
||||
cluster_id,
|
||||
table_info_value: None,
|
||||
region_distribution: None,
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -106,6 +106,7 @@ fn create_proto_alter_kind(
|
||||
})))
|
||||
}
|
||||
Kind::RenameTable(_) => Ok(None),
|
||||
Kind::ChangeTableOptions(v) => Ok(Some(alter_request::Kind::ChangeTableOptions(v.clone()))),
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -20,7 +20,7 @@ use table::requests::AlterKind;
|
||||
use crate::ddl::alter_table::AlterTableProcedure;
|
||||
use crate::error::{self, Result};
|
||||
use crate::key::table_info::TableInfoValue;
|
||||
use crate::key::DeserializedValueWithBytes;
|
||||
use crate::key::{DeserializedValueWithBytes, RegionDistribution};
|
||||
|
||||
impl AlterTableProcedure {
|
||||
/// Builds new_meta
|
||||
@@ -51,7 +51,9 @@ impl AlterTableProcedure {
|
||||
AlterKind::RenameTable { new_table_name } => {
|
||||
new_info.name = new_table_name.to_string();
|
||||
}
|
||||
AlterKind::DropColumns { .. } | AlterKind::ChangeColumnTypes { .. } => {}
|
||||
AlterKind::DropColumns { .. }
|
||||
| AlterKind::ChangeColumnTypes { .. }
|
||||
| AlterKind::ChangeTableOptions { .. } => {}
|
||||
}
|
||||
|
||||
Ok(new_info)
|
||||
@@ -75,11 +77,16 @@ impl AlterTableProcedure {
|
||||
pub(crate) async fn on_update_metadata_for_alter(
|
||||
&self,
|
||||
new_table_info: RawTableInfo,
|
||||
region_distribution: RegionDistribution,
|
||||
current_table_info_value: &DeserializedValueWithBytes<TableInfoValue>,
|
||||
) -> Result<()> {
|
||||
let table_metadata_manager = &self.context.table_metadata_manager;
|
||||
table_metadata_manager
|
||||
.update_table_info(current_table_info_value, new_table_info)
|
||||
.update_table_info(
|
||||
current_table_info_value,
|
||||
Some(region_distribution),
|
||||
new_table_info,
|
||||
)
|
||||
.await?;
|
||||
|
||||
Ok(())
|
||||
|
||||
@@ -58,10 +58,10 @@ impl CreateLogicalTablesProcedure {
|
||||
&new_table_info.name,
|
||||
);
|
||||
|
||||
// Update physical table's metadata
|
||||
// Update physical table's metadata and we don't need to touch per-region settings.
|
||||
self.context
|
||||
.table_metadata_manager
|
||||
.update_table_info(&physical_table_info, new_table_info)
|
||||
.update_table_info(&physical_table_info, None, new_table_info)
|
||||
.await?;
|
||||
|
||||
// Invalid physical table cache
|
||||
|
||||
@@ -29,7 +29,10 @@ use crate::test_util::MockDatanodeHandler;
|
||||
#[async_trait::async_trait]
|
||||
impl MockDatanodeHandler for () {
|
||||
async fn handle(&self, _peer: &Peer, _request: RegionRequest) -> Result<RegionResponse> {
|
||||
unreachable!()
|
||||
Ok(RegionResponse {
|
||||
affected_rows: 0,
|
||||
extensions: Default::default(),
|
||||
})
|
||||
}
|
||||
|
||||
async fn handle_query(
|
||||
|
||||
@@ -19,13 +19,14 @@ use std::sync::Arc;
|
||||
use api::v1::alter_expr::Kind;
|
||||
use api::v1::region::{region_request, RegionRequest};
|
||||
use api::v1::{
|
||||
AddColumn, AddColumns, AlterExpr, ColumnDataType, ColumnDef as PbColumnDef, DropColumn,
|
||||
DropColumns, SemanticType,
|
||||
AddColumn, AddColumns, AlterExpr, ChangeTableOption, ChangeTableOptions, ColumnDataType,
|
||||
ColumnDef as PbColumnDef, DropColumn, DropColumns, SemanticType,
|
||||
};
|
||||
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
|
||||
use common_error::ext::ErrorExt;
|
||||
use common_error::status_code::StatusCode;
|
||||
use store_api::storage::RegionId;
|
||||
use table::requests::TTL_KEY;
|
||||
use tokio::sync::mpsc::{self};
|
||||
|
||||
use crate::ddl::alter_table::AlterTableProcedure;
|
||||
@@ -34,6 +35,7 @@ use crate::ddl::test_util::create_table::test_create_table_task;
|
||||
use crate::ddl::test_util::datanode_handler::{
|
||||
DatanodeWatcher, RequestOutdatedErrorDatanodeHandler,
|
||||
};
|
||||
use crate::key::datanode_table::DatanodeTableKey;
|
||||
use crate::key::table_name::TableNameKey;
|
||||
use crate::key::table_route::TableRouteValue;
|
||||
use crate::peer::Peer;
|
||||
@@ -293,12 +295,21 @@ async fn test_on_update_metadata_add_columns() {
|
||||
let table_name = "foo";
|
||||
let table_id = 1024;
|
||||
let task = test_create_table_task(table_name, table_id);
|
||||
|
||||
let region_id = RegionId::new(table_id, 0);
|
||||
let mock_table_routes = vec![RegionRoute {
|
||||
region: Region::new_test(region_id),
|
||||
leader_peer: Some(Peer::default()),
|
||||
follower_peers: vec![],
|
||||
leader_state: None,
|
||||
leader_down_since: None,
|
||||
}];
|
||||
// Puts a value to table name key.
|
||||
ddl_context
|
||||
.table_metadata_manager
|
||||
.create_table_metadata(
|
||||
task.table_info.clone(),
|
||||
TableRouteValue::physical(vec![]),
|
||||
TableRouteValue::physical(mock_table_routes),
|
||||
HashMap::new(),
|
||||
)
|
||||
.await
|
||||
@@ -326,6 +337,7 @@ async fn test_on_update_metadata_add_columns() {
|
||||
let mut procedure =
|
||||
AlterTableProcedure::new(cluster_id, table_id, task, ddl_context.clone()).unwrap();
|
||||
procedure.on_prepare().await.unwrap();
|
||||
procedure.submit_alter_region_requests().await.unwrap();
|
||||
procedure.on_update_metadata().await.unwrap();
|
||||
|
||||
let table_info = ddl_context
|
||||
@@ -343,3 +355,76 @@ async fn test_on_update_metadata_add_columns() {
|
||||
table_info.meta.next_column_id
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_on_update_table_options() {
|
||||
let node_manager = Arc::new(MockDatanodeManager::new(()));
|
||||
let ddl_context = new_ddl_context(node_manager);
|
||||
let cluster_id = 1;
|
||||
let table_name = "foo";
|
||||
let table_id = 1024;
|
||||
let task = test_create_table_task(table_name, table_id);
|
||||
|
||||
let region_id = RegionId::new(table_id, 0);
|
||||
let mock_table_routes = vec![RegionRoute {
|
||||
region: Region::new_test(region_id),
|
||||
leader_peer: Some(Peer::default()),
|
||||
follower_peers: vec![],
|
||||
leader_state: None,
|
||||
leader_down_since: None,
|
||||
}];
|
||||
// Puts a value to table name key.
|
||||
ddl_context
|
||||
.table_metadata_manager
|
||||
.create_table_metadata(
|
||||
task.table_info.clone(),
|
||||
TableRouteValue::physical(mock_table_routes),
|
||||
HashMap::new(),
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let task = AlterTableTask {
|
||||
alter_table: AlterExpr {
|
||||
catalog_name: DEFAULT_CATALOG_NAME.to_string(),
|
||||
schema_name: DEFAULT_SCHEMA_NAME.to_string(),
|
||||
table_name: table_name.to_string(),
|
||||
kind: Some(Kind::ChangeTableOptions(ChangeTableOptions {
|
||||
change_table_options: vec![ChangeTableOption {
|
||||
key: TTL_KEY.to_string(),
|
||||
value: "1d".to_string(),
|
||||
}],
|
||||
})),
|
||||
},
|
||||
};
|
||||
let mut procedure =
|
||||
AlterTableProcedure::new(cluster_id, table_id, task, ddl_context.clone()).unwrap();
|
||||
procedure.on_prepare().await.unwrap();
|
||||
procedure.submit_alter_region_requests().await.unwrap();
|
||||
procedure.on_update_metadata().await.unwrap();
|
||||
|
||||
let table_info = ddl_context
|
||||
.table_metadata_manager
|
||||
.table_info_manager()
|
||||
.get(table_id)
|
||||
.await
|
||||
.unwrap()
|
||||
.unwrap()
|
||||
.into_inner()
|
||||
.table_info;
|
||||
|
||||
let datanode_key = DatanodeTableKey::new(0, table_id);
|
||||
let region_info = ddl_context
|
||||
.table_metadata_manager
|
||||
.datanode_table_manager()
|
||||
.get(&datanode_key)
|
||||
.await
|
||||
.unwrap()
|
||||
.unwrap()
|
||||
.region_info;
|
||||
|
||||
assert_eq!(
|
||||
region_info.region_options,
|
||||
HashMap::from(&table_info.meta.options)
|
||||
);
|
||||
}
|
||||
|
||||
@@ -652,6 +652,18 @@ pub enum Error {
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display(
|
||||
"Datanode table info not found, table id: {}, datanode id: {}",
|
||||
table_id,
|
||||
datanode_id
|
||||
))]
|
||||
DatanodeTableInfoNotFound {
|
||||
datanode_id: DatanodeId,
|
||||
table_id: TableId,
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
}
|
||||
|
||||
pub type Result<T> = std::result::Result<T, Error>;
|
||||
@@ -752,6 +764,7 @@ impl ErrorExt for Error {
|
||||
PostgresExecution { .. } => StatusCode::Internal,
|
||||
#[cfg(feature = "pg_kvbackend")]
|
||||
ConnectPostgres { .. } => StatusCode::Internal,
|
||||
Error::DatanodeTableInfoNotFound { .. } => StatusCode::Internal,
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -133,7 +133,6 @@ use self::flow::flow_name::FlowNameValue;
|
||||
use self::schema_name::{SchemaManager, SchemaNameKey, SchemaNameValue};
|
||||
use self::table_route::{TableRouteManager, TableRouteValue};
|
||||
use self::tombstone::TombstoneManager;
|
||||
use crate::ddl::utils::region_storage_path;
|
||||
use crate::error::{self, Result, SerdeJsonSnafu};
|
||||
use crate::key::node_address::NodeAddressValue;
|
||||
use crate::key::table_route::TableRouteKey;
|
||||
@@ -593,8 +592,6 @@ impl TableMetadataManager {
|
||||
table_info.meta.region_numbers = region_numbers;
|
||||
let table_id = table_info.ident.table_id;
|
||||
let engine = table_info.meta.engine.clone();
|
||||
let region_storage_path =
|
||||
region_storage_path(&table_info.catalog_name, &table_info.schema_name);
|
||||
|
||||
// Creates table name.
|
||||
let table_name = TableNameKey::new(
|
||||
@@ -606,7 +603,7 @@ impl TableMetadataManager {
|
||||
.table_name_manager()
|
||||
.build_create_txn(&table_name, table_id)?;
|
||||
|
||||
let region_options = (&table_info.meta.options).into();
|
||||
let region_options = table_info.to_region_options();
|
||||
// Creates table info.
|
||||
let table_info_value = TableInfoValue::new(table_info);
|
||||
let (create_table_info_txn, on_create_table_info_failure) = self
|
||||
@@ -625,6 +622,7 @@ impl TableMetadataManager {
|
||||
]);
|
||||
|
||||
if let TableRouteValue::Physical(x) = &table_route_value {
|
||||
let region_storage_path = table_info_value.region_storage_path();
|
||||
let create_datanode_table_txn = self.datanode_table_manager().build_create_txn(
|
||||
table_id,
|
||||
&engine,
|
||||
@@ -926,13 +924,15 @@ impl TableMetadataManager {
|
||||
}
|
||||
|
||||
/// Updates table info and returns an error if different metadata exists.
|
||||
/// And cascade-ly update all redundant table options for each region
|
||||
/// if region_distribution is present.
|
||||
pub async fn update_table_info(
|
||||
&self,
|
||||
current_table_info_value: &DeserializedValueWithBytes<TableInfoValue>,
|
||||
region_distribution: Option<RegionDistribution>,
|
||||
new_table_info: RawTableInfo,
|
||||
) -> Result<()> {
|
||||
let table_id = current_table_info_value.table_info.ident.table_id;
|
||||
|
||||
let new_table_info_value = current_table_info_value.update(new_table_info);
|
||||
|
||||
// Updates table info.
|
||||
@@ -940,8 +940,19 @@ impl TableMetadataManager {
|
||||
.table_info_manager()
|
||||
.build_update_txn(table_id, current_table_info_value, &new_table_info_value)?;
|
||||
|
||||
let mut r = self.kv_backend.txn(update_table_info_txn).await?;
|
||||
let txn = if let Some(region_distribution) = region_distribution {
|
||||
// region options induced from table info.
|
||||
let new_region_options = new_table_info_value.table_info.to_region_options();
|
||||
let update_datanode_table_options_txn = self
|
||||
.datanode_table_manager
|
||||
.build_update_table_options_txn(table_id, region_distribution, new_region_options)
|
||||
.await?;
|
||||
Txn::merge_all([update_table_info_txn, update_datanode_table_options_txn])
|
||||
} else {
|
||||
update_table_info_txn
|
||||
};
|
||||
|
||||
let mut r = self.kv_backend.txn(txn).await?;
|
||||
// Checks whether metadata was already updated.
|
||||
if !r.succeeded {
|
||||
let mut set = TxnOpGetResponseSet::from(&mut r.responses);
|
||||
@@ -1669,12 +1680,12 @@ mod tests {
|
||||
DeserializedValueWithBytes::from_inner(TableInfoValue::new(table_info.clone()));
|
||||
// should be ok.
|
||||
table_metadata_manager
|
||||
.update_table_info(¤t_table_info_value, new_table_info.clone())
|
||||
.update_table_info(¤t_table_info_value, None, new_table_info.clone())
|
||||
.await
|
||||
.unwrap();
|
||||
// if table info was updated, it should be ok.
|
||||
table_metadata_manager
|
||||
.update_table_info(¤t_table_info_value, new_table_info.clone())
|
||||
.update_table_info(¤t_table_info_value, None, new_table_info.clone())
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
@@ -1696,7 +1707,7 @@ mod tests {
|
||||
// if the current_table_info_value is wrong, it should return an error.
|
||||
// The ABA problem.
|
||||
assert!(table_metadata_manager
|
||||
.update_table_info(&wrong_table_info_value, new_table_info)
|
||||
.update_table_info(&wrong_table_info_value, None, new_table_info)
|
||||
.await
|
||||
.is_err())
|
||||
}
|
||||
|
||||
@@ -23,7 +23,7 @@ use store_api::storage::RegionNumber;
|
||||
use table::metadata::TableId;
|
||||
|
||||
use super::MetadataKey;
|
||||
use crate::error::{InvalidMetadataSnafu, Result};
|
||||
use crate::error::{DatanodeTableInfoNotFoundSnafu, InvalidMetadataSnafu, Result};
|
||||
use crate::key::{
|
||||
MetadataValue, RegionDistribution, DATANODE_TABLE_KEY_PATTERN, DATANODE_TABLE_KEY_PREFIX,
|
||||
};
|
||||
@@ -209,6 +209,49 @@ impl DatanodeTableManager {
|
||||
Ok(txn)
|
||||
}
|
||||
|
||||
/// Builds a transaction to updates the redundant table options (including WAL options)
|
||||
/// for given table id, if provided.
|
||||
///
|
||||
/// Note that the provided `new_region_options` must be a
|
||||
/// complete set of all options rather than incremental changes.
|
||||
pub(crate) async fn build_update_table_options_txn(
|
||||
&self,
|
||||
table_id: TableId,
|
||||
region_distribution: RegionDistribution,
|
||||
new_region_options: HashMap<String, String>,
|
||||
) -> Result<Txn> {
|
||||
assert!(!region_distribution.is_empty());
|
||||
// safety: region_distribution must not be empty
|
||||
let (any_datanode, _) = region_distribution.first_key_value().unwrap();
|
||||
|
||||
let mut region_info = self
|
||||
.kv_backend
|
||||
.get(&DatanodeTableKey::new(*any_datanode, table_id).to_bytes())
|
||||
.await
|
||||
.transpose()
|
||||
.context(DatanodeTableInfoNotFoundSnafu {
|
||||
datanode_id: *any_datanode,
|
||||
table_id,
|
||||
})?
|
||||
.and_then(|r| DatanodeTableValue::try_from_raw_value(&r.value))?
|
||||
.region_info;
|
||||
// substitute region options only.
|
||||
region_info.region_options = new_region_options;
|
||||
|
||||
let mut txns = Vec::with_capacity(region_distribution.len());
|
||||
|
||||
for (datanode, regions) in region_distribution.into_iter() {
|
||||
let key = DatanodeTableKey::new(datanode, table_id);
|
||||
let key_bytes = key.to_bytes();
|
||||
let value_bytes = DatanodeTableValue::new(table_id, regions, region_info.clone())
|
||||
.try_as_raw_value()?;
|
||||
txns.push(TxnOp::Put(key_bytes, value_bytes));
|
||||
}
|
||||
|
||||
let txn = Txn::new().and_then(txns);
|
||||
Ok(txn)
|
||||
}
|
||||
|
||||
/// Builds the update datanode table transactions. It only executes while the primary keys comparing successes.
|
||||
pub(crate) fn build_update_txn(
|
||||
&self,
|
||||
|
||||
@@ -23,6 +23,7 @@ use table::table_name::TableName;
|
||||
use table::table_reference::TableReference;
|
||||
|
||||
use super::TABLE_INFO_KEY_PATTERN;
|
||||
use crate::ddl::utils::region_storage_path;
|
||||
use crate::error::{InvalidMetadataSnafu, Result};
|
||||
use crate::key::txn_helper::TxnOpGetResponseSet;
|
||||
use crate::key::{DeserializedValueWithBytes, MetadataKey, MetadataValue, TABLE_INFO_KEY_PREFIX};
|
||||
@@ -125,6 +126,11 @@ impl TableInfoValue {
|
||||
table_name: self.table_info.name.to_string(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Builds storage path for all regions in table.
|
||||
pub fn region_storage_path(&self) -> String {
|
||||
region_storage_path(&self.table_info.catalog_name, &self.table_info.schema_name)
|
||||
}
|
||||
}
|
||||
|
||||
pub type TableInfoManagerRef = Arc<TableInfoManager>;
|
||||
|
||||
@@ -98,6 +98,18 @@ impl VersionControl {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Applies region option changes and generates a new version.
|
||||
pub(crate) fn alter_options(&self, options: RegionOptions) {
|
||||
let version = self.current().version;
|
||||
let new_version = Arc::new(
|
||||
VersionBuilder::from_version(version)
|
||||
.options(options)
|
||||
.build(),
|
||||
);
|
||||
let mut version_data = self.data.write().unwrap();
|
||||
version_data.version = new_version;
|
||||
}
|
||||
|
||||
/// Apply edit to current version.
|
||||
pub(crate) fn apply_edit(
|
||||
&self,
|
||||
|
||||
@@ -19,7 +19,7 @@ use std::sync::Arc;
|
||||
use common_telemetry::{debug, info};
|
||||
use snafu::ResultExt;
|
||||
use store_api::metadata::{RegionMetadata, RegionMetadataBuilder, RegionMetadataRef};
|
||||
use store_api::region_request::RegionAlterRequest;
|
||||
use store_api::region_request::{AlterKind, ChangeOption, RegionAlterRequest};
|
||||
use store_api::storage::RegionId;
|
||||
|
||||
use crate::error::{
|
||||
@@ -27,6 +27,8 @@ use crate::error::{
|
||||
};
|
||||
use crate::flush::FlushReason;
|
||||
use crate::manifest::action::RegionChange;
|
||||
use crate::region::version::VersionRef;
|
||||
use crate::region::MitoRegionRef;
|
||||
use crate::request::{DdlRequest, OptionOutputTx, SenderDdlRequest};
|
||||
use crate::worker::RegionWorkerLoop;
|
||||
|
||||
@@ -45,6 +47,13 @@ impl<S> RegionWorkerLoop<S> {
|
||||
|
||||
// Get the version before alter.
|
||||
let version = region.version();
|
||||
|
||||
// fast path for memory state changes like options.
|
||||
if let AlterKind::ChangeRegionOptions { options } = request.kind {
|
||||
self.handle_alter_region_options(region, version, options, sender);
|
||||
return;
|
||||
}
|
||||
|
||||
if version.metadata.schema_version != request.schema_version {
|
||||
// This is possible if we retry the request.
|
||||
debug!(
|
||||
@@ -67,6 +76,7 @@ impl<S> RegionWorkerLoop<S> {
|
||||
sender.send(Err(e).context(InvalidRegionRequestSnafu));
|
||||
return;
|
||||
}
|
||||
|
||||
// Checks whether we need to alter the region.
|
||||
if !request.need_alter(&version.metadata) {
|
||||
debug!(
|
||||
@@ -111,7 +121,17 @@ impl<S> RegionWorkerLoop<S> {
|
||||
version.metadata.schema_version,
|
||||
region.metadata().schema_version
|
||||
);
|
||||
self.handle_alter_region_metadata(region, version, request, sender);
|
||||
}
|
||||
|
||||
/// Handles region metadata changes.
|
||||
fn handle_alter_region_metadata(
|
||||
&mut self,
|
||||
region: MitoRegionRef,
|
||||
version: VersionRef,
|
||||
request: RegionAlterRequest,
|
||||
sender: OptionOutputTx,
|
||||
) {
|
||||
let new_meta = match metadata_after_alteration(&version.metadata, request) {
|
||||
Ok(new_meta) => new_meta,
|
||||
Err(e) => {
|
||||
@@ -120,11 +140,38 @@ impl<S> RegionWorkerLoop<S> {
|
||||
}
|
||||
};
|
||||
// Persist the metadata to region's manifest.
|
||||
let change = RegionChange {
|
||||
metadata: new_meta.clone(),
|
||||
};
|
||||
let change = RegionChange { metadata: new_meta };
|
||||
self.handle_manifest_region_change(region, change, sender)
|
||||
}
|
||||
|
||||
/// Handles requests that changes region options, like TTL. It only affects memory state
|
||||
/// since changes are persisted in the `DatanodeTableValue` in metasrv.
|
||||
fn handle_alter_region_options(
|
||||
&mut self,
|
||||
region: MitoRegionRef,
|
||||
version: VersionRef,
|
||||
options: Vec<ChangeOption>,
|
||||
sender: OptionOutputTx,
|
||||
) {
|
||||
let mut current_options = version.options.clone();
|
||||
for option in options {
|
||||
match option {
|
||||
ChangeOption::TTL(new_ttl) => {
|
||||
info!(
|
||||
"Update region ttl: {}, previous: {:?} new: {:?}",
|
||||
region.region_id, current_options.ttl, new_ttl
|
||||
);
|
||||
if new_ttl.is_zero() {
|
||||
current_options.ttl = None;
|
||||
} else {
|
||||
current_options.ttl = Some(new_ttl);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
region.version_control.alter_options(current_options);
|
||||
sender.send(Ok(0));
|
||||
}
|
||||
}
|
||||
|
||||
/// Creates a metadata after applying the alter `request` to the old `metadata`.
|
||||
|
||||
@@ -22,6 +22,7 @@ use snafu::{OptionExt, ResultExt};
|
||||
use store_api::logstore::LogStore;
|
||||
use store_api::region_request::RegionOpenRequest;
|
||||
use store_api::storage::RegionId;
|
||||
use table::requests::STORAGE_KEY;
|
||||
|
||||
use crate::error::{
|
||||
ObjectStoreNotFoundSnafu, OpenDalSnafu, OpenRegionSnafu, RegionNotFoundSnafu, Result,
|
||||
@@ -38,7 +39,7 @@ impl<S: LogStore> RegionWorkerLoop<S> {
|
||||
region_id: RegionId,
|
||||
request: &RegionOpenRequest,
|
||||
) -> Result<()> {
|
||||
let object_store = if let Some(storage_name) = request.options.get("storage") {
|
||||
let object_store = if let Some(storage_name) = request.options.get(STORAGE_KEY) {
|
||||
self.object_store_manager
|
||||
.find(storage_name)
|
||||
.context(ObjectStoreNotFoundSnafu {
|
||||
|
||||
@@ -18,9 +18,9 @@ use api::helper::ColumnDataTypeWrapper;
|
||||
use api::v1::alter_expr::Kind;
|
||||
use api::v1::column_def::options_from_column_schema;
|
||||
use api::v1::{
|
||||
AddColumn, AddColumns, AlterExpr, ChangeColumnType, ChangeColumnTypes, ColumnDataType,
|
||||
ColumnDataTypeExtension, CreateFlowExpr, CreateTableExpr, CreateViewExpr, DropColumn,
|
||||
DropColumns, ExpireAfter, RenameTable, SemanticType, TableName,
|
||||
AddColumn, AddColumns, AlterExpr, ChangeColumnType, ChangeColumnTypes, ChangeTableOptions,
|
||||
ColumnDataType, ColumnDataTypeExtension, CreateFlowExpr, CreateTableExpr, CreateViewExpr,
|
||||
DropColumn, DropColumns, ExpireAfter, RenameTable, SemanticType, TableName,
|
||||
};
|
||||
use common_error::ext::BoxedError;
|
||||
use common_grpc_expr::util::ColumnExpr;
|
||||
@@ -438,7 +438,7 @@ pub(crate) fn to_alter_expr(
|
||||
.map_err(BoxedError::new)
|
||||
.context(ExternalSnafu)?;
|
||||
|
||||
let kind = match alter_table.alter_operation() {
|
||||
let kind = match alter_table.alter_operation {
|
||||
AlterTableOperation::AddConstraint(_) => {
|
||||
return NotSupportedSnafu {
|
||||
feat: "ADD CONSTRAINT",
|
||||
@@ -451,7 +451,7 @@ pub(crate) fn to_alter_expr(
|
||||
} => Kind::AddColumns(AddColumns {
|
||||
add_columns: vec![AddColumn {
|
||||
column_def: Some(
|
||||
sql_column_def_to_grpc_column_def(column_def, Some(&query_ctx.timezone()))
|
||||
sql_column_def_to_grpc_column_def(&column_def, Some(&query_ctx.timezone()))
|
||||
.map_err(BoxedError::new)
|
||||
.context(ExternalSnafu)?,
|
||||
),
|
||||
@@ -463,13 +463,13 @@ pub(crate) fn to_alter_expr(
|
||||
target_type,
|
||||
} => {
|
||||
let target_type =
|
||||
sql_data_type_to_concrete_data_type(target_type).context(ParseSqlSnafu)?;
|
||||
sql_data_type_to_concrete_data_type(&target_type).context(ParseSqlSnafu)?;
|
||||
let (target_type, target_type_extension) = ColumnDataTypeWrapper::try_from(target_type)
|
||||
.map(|w| w.to_parts())
|
||||
.context(ColumnDataTypeSnafu)?;
|
||||
Kind::ChangeColumnTypes(ChangeColumnTypes {
|
||||
change_column_types: vec![ChangeColumnType {
|
||||
column_name: column_name.value.to_string(),
|
||||
column_name: column_name.value,
|
||||
target_type: target_type as i32,
|
||||
target_type_extension,
|
||||
}],
|
||||
@@ -483,6 +483,11 @@ pub(crate) fn to_alter_expr(
|
||||
AlterTableOperation::RenameTable { new_table_name } => Kind::RenameTable(RenameTable {
|
||||
new_table_name: new_table_name.to_string(),
|
||||
}),
|
||||
AlterTableOperation::ChangeTableOptions { options } => {
|
||||
Kind::ChangeTableOptions(ChangeTableOptions {
|
||||
change_table_options: options.into_iter().map(Into::into).collect(),
|
||||
})
|
||||
}
|
||||
};
|
||||
|
||||
Ok(AlterExpr {
|
||||
@@ -744,7 +749,7 @@ mod tests {
|
||||
|
||||
#[test]
|
||||
fn test_to_alter_change_column_type_expr() {
|
||||
let sql = "ALTER TABLE monitor MODIFY mem_usage STRING;";
|
||||
let sql = "ALTER TABLE monitor MODIFY COLUMN mem_usage STRING;";
|
||||
let stmt =
|
||||
ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
|
||||
.unwrap()
|
||||
|
||||
@@ -15,12 +15,12 @@
|
||||
use common_query::AddColumnLocation;
|
||||
use snafu::ResultExt;
|
||||
use sqlparser::keywords::Keyword;
|
||||
use sqlparser::parser::ParserError;
|
||||
use sqlparser::parser::{Parser, ParserError};
|
||||
use sqlparser::tokenizer::Token;
|
||||
|
||||
use crate::error::{self, Result};
|
||||
use crate::parser::ParserContext;
|
||||
use crate::statements::alter::{AlterTable, AlterTableOperation};
|
||||
use crate::statements::alter::{AlterTable, AlterTableOperation, ChangeTableOption};
|
||||
use crate::statements::statement::Statement;
|
||||
|
||||
impl ParserContext<'_> {
|
||||
@@ -94,6 +94,14 @@ impl ParserContext<'_> {
|
||||
}
|
||||
};
|
||||
AlterTableOperation::RenameTable { new_table_name }
|
||||
} else if self.parser.parse_keyword(Keyword::SET) {
|
||||
let options = self
|
||||
.parser
|
||||
.parse_comma_separated(parse_string_options)?
|
||||
.into_iter()
|
||||
.map(|(key, value)| ChangeTableOption { key, value })
|
||||
.collect();
|
||||
AlterTableOperation::ChangeTableOptions { options }
|
||||
} else {
|
||||
return Err(ParserError::ParserError(format!(
|
||||
"expect keyword ADD or DROP or MODIFY or RENAME after ALTER TABLE, found {}",
|
||||
@@ -104,6 +112,22 @@ impl ParserContext<'_> {
|
||||
}
|
||||
}
|
||||
|
||||
fn parse_string_options(parser: &mut Parser) -> std::result::Result<(String, String), ParserError> {
|
||||
let name = parser.parse_literal_string()?;
|
||||
parser.expect_token(&Token::Eq)?;
|
||||
let value = if parser.parse_keyword(Keyword::NULL) {
|
||||
"".to_string()
|
||||
} else if let Ok(v) = parser.parse_literal_string() {
|
||||
v
|
||||
} else {
|
||||
return Err(ParserError::ParserError(format!(
|
||||
"Unexpected option value for alter table statements, expect string literal or NULL, got: `{}`",
|
||||
parser.next_token()
|
||||
)));
|
||||
};
|
||||
Ok((name, value))
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::assert_matches::assert_matches;
|
||||
@@ -272,7 +296,7 @@ mod tests {
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
let sql_2 = "ALTER TABLE my_metric_1 MODIFY a STRING";
|
||||
let sql_2 = "ALTER TABLE my_metric_1 MODIFY COLUMN a STRING";
|
||||
let mut result_2 = ParserContext::create_with_dialect(
|
||||
sql_2,
|
||||
&GreptimeDbDialect {},
|
||||
@@ -406,4 +430,44 @@ mod tests {
|
||||
_ => unreachable!(),
|
||||
}
|
||||
}
|
||||
|
||||
fn check_parse_alter_table(sql: &str, expected: &[(&str, &str)]) {
|
||||
let result =
|
||||
ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
|
||||
.unwrap();
|
||||
assert_eq!(1, result.len());
|
||||
let Statement::Alter(alter) = &result[0] else {
|
||||
unreachable!()
|
||||
};
|
||||
assert_eq!("test_table", alter.table_name.0[0].value);
|
||||
let AlterTableOperation::ChangeTableOptions { options } = &alter.alter_operation else {
|
||||
unreachable!()
|
||||
};
|
||||
let res = options
|
||||
.iter()
|
||||
.map(|o| (o.key.as_str(), o.value.as_str()))
|
||||
.collect::<Vec<_>>();
|
||||
assert_eq!(expected, &res);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_parse_alter_column() {
|
||||
check_parse_alter_table("ALTER TABLE test_table SET 'a'='A';", &[("a", "A")]);
|
||||
check_parse_alter_table(
|
||||
"ALTER TABLE test_table SET 'a'='A','b'='B'",
|
||||
&[("a", "A"), ("b", "B")],
|
||||
);
|
||||
check_parse_alter_table(
|
||||
"ALTER TABLE test_table SET 'a'='A','b'='B','c'='C';",
|
||||
&[("a", "A"), ("b", "B"), ("c", "C")],
|
||||
);
|
||||
check_parse_alter_table("ALTER TABLE test_table SET 'a'=NULL;", &[("a", "")]);
|
||||
|
||||
ParserContext::create_with_dialect(
|
||||
"ALTER TABLE test_table SET a INTEGER",
|
||||
&GreptimeDbDialect {},
|
||||
ParseOptions::default(),
|
||||
)
|
||||
.unwrap_err();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -14,14 +14,15 @@
|
||||
|
||||
use std::fmt::{Debug, Display};
|
||||
|
||||
use api::v1;
|
||||
use common_query::AddColumnLocation;
|
||||
use sqlparser::ast::{ColumnDef, DataType, Ident, ObjectName, TableConstraint};
|
||||
use sqlparser_derive::{Visit, VisitMut};
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq, Visit, VisitMut)]
|
||||
pub struct AlterTable {
|
||||
table_name: ObjectName,
|
||||
alter_operation: AlterTableOperation,
|
||||
pub table_name: ObjectName,
|
||||
pub alter_operation: AlterTableOperation,
|
||||
}
|
||||
|
||||
impl AlterTable {
|
||||
@@ -67,6 +68,8 @@ pub enum AlterTableOperation {
|
||||
column_name: Ident,
|
||||
target_type: DataType,
|
||||
},
|
||||
/// `MODIFY <table attrs key> = <table attr value>`
|
||||
ChangeTableOptions { options: Vec<ChangeTableOption> },
|
||||
/// `DROP COLUMN <name>`
|
||||
DropColumn { name: Ident },
|
||||
/// `RENAME <new_table_name>`
|
||||
@@ -97,6 +100,27 @@ impl Display for AlterTableOperation {
|
||||
} => {
|
||||
write!(f, r#"MODIFY COLUMN {column_name} {target_type}"#)
|
||||
}
|
||||
AlterTableOperation::ChangeTableOptions { options } => {
|
||||
for ChangeTableOption { key, value } in options {
|
||||
write!(f, r#"MODIFY '{key}'='{value}', "#)?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq, Visit, VisitMut)]
|
||||
pub struct ChangeTableOption {
|
||||
pub key: String,
|
||||
pub value: String,
|
||||
}
|
||||
|
||||
impl From<ChangeTableOption> for v1::ChangeTableOption {
|
||||
fn from(c: ChangeTableOption) -> Self {
|
||||
v1::ChangeTableOption {
|
||||
key: c.key,
|
||||
value: c.value,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -22,6 +22,7 @@ datafusion-physical-plan.workspace = true
|
||||
datatypes.workspace = true
|
||||
derive_builder.workspace = true
|
||||
futures.workspace = true
|
||||
humantime.workspace = true
|
||||
serde.workspace = true
|
||||
serde_json.workspace = true
|
||||
snafu.workspace = true
|
||||
|
||||
@@ -523,6 +523,9 @@ impl RegionMetadataBuilder {
|
||||
AlterKind::AddColumns { columns } => self.add_columns(columns)?,
|
||||
AlterKind::DropColumns { names } => self.drop_columns(&names),
|
||||
AlterKind::ChangeColumnTypes { columns } => self.change_column_types(columns),
|
||||
AlterKind::ChangeRegionOptions { options: _ } => {
|
||||
// nothing to be done with RegionMetadata
|
||||
}
|
||||
}
|
||||
Ok(self)
|
||||
}
|
||||
@@ -738,6 +741,14 @@ pub enum MetadataError {
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Invalid region option change request, key: {}, value: {}", key, value))]
|
||||
InvalidRegionOptionChangeRequest {
|
||||
key: String,
|
||||
value: String,
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
}
|
||||
|
||||
impl ErrorExt for MetadataError {
|
||||
|
||||
@@ -14,25 +14,29 @@
|
||||
|
||||
use std::collections::HashMap;
|
||||
use std::fmt;
|
||||
use std::time::Duration;
|
||||
|
||||
use api::helper::ColumnDataTypeWrapper;
|
||||
use api::v1::add_column_location::LocationType;
|
||||
use api::v1::region::alter_request::Kind;
|
||||
use api::v1::region::{
|
||||
alter_request, compact_request, region_request, AlterRequest, AlterRequests, CloseRequest,
|
||||
CompactRequest, CreateRequest, CreateRequests, DeleteRequests, DropRequest, DropRequests,
|
||||
FlushRequest, InsertRequests, OpenRequest, TruncateRequest,
|
||||
};
|
||||
use api::v1::{self, Rows, SemanticType};
|
||||
use api::v1::{self, ChangeTableOption, Rows, SemanticType};
|
||||
pub use common_base::AffectedRows;
|
||||
use datatypes::data_type::ConcreteDataType;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use snafu::{ensure, OptionExt};
|
||||
use strum::IntoStaticStr;
|
||||
|
||||
use crate::logstore::entry;
|
||||
use crate::metadata::{
|
||||
ColumnMetadata, InvalidRawRegionRequestSnafu, InvalidRegionRequestSnafu, MetadataError,
|
||||
RegionMetadata, Result,
|
||||
ColumnMetadata, InvalidRawRegionRequestSnafu, InvalidRegionOptionChangeRequestSnafu,
|
||||
InvalidRegionRequestSnafu, MetadataError, RegionMetadata, Result,
|
||||
};
|
||||
use crate::mito_engine_options::TTL_KEY;
|
||||
use crate::path_utils::region_dir;
|
||||
use crate::storage::{ColumnId, RegionId, ScanRequest};
|
||||
|
||||
@@ -389,6 +393,8 @@ pub enum AlterKind {
|
||||
/// Columns to change.
|
||||
columns: Vec<ChangeColumnType>,
|
||||
},
|
||||
/// Change region options.
|
||||
ChangeRegionOptions { options: Vec<ChangeOption> },
|
||||
}
|
||||
|
||||
impl AlterKind {
|
||||
@@ -412,6 +418,7 @@ impl AlterKind {
|
||||
col_to_change.validate(metadata)?;
|
||||
}
|
||||
}
|
||||
AlterKind::ChangeRegionOptions { .. } => {}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
@@ -429,6 +436,11 @@ impl AlterKind {
|
||||
AlterKind::ChangeColumnTypes { columns } => columns
|
||||
.iter()
|
||||
.any(|col_to_change| col_to_change.need_alter(metadata)),
|
||||
AlterKind::ChangeRegionOptions { .. } => {
|
||||
// we need to update region options for `ChangeTableOptions`.
|
||||
// todo: we need to check if ttl has ever changed.
|
||||
true
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -473,6 +485,13 @@ impl TryFrom<alter_request::Kind> for AlterKind {
|
||||
let names = x.drop_columns.into_iter().map(|x| x.name).collect();
|
||||
AlterKind::DropColumns { names }
|
||||
}
|
||||
Kind::ChangeTableOptions(change_options) => AlterKind::ChangeRegionOptions {
|
||||
options: change_options
|
||||
.change_table_options
|
||||
.iter()
|
||||
.map(TryFrom::try_from)
|
||||
.collect::<Result<Vec<_>>>()?,
|
||||
},
|
||||
};
|
||||
|
||||
Ok(alter_kind)
|
||||
@@ -639,6 +658,30 @@ impl From<v1::ChangeColumnType> for ChangeColumnType {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Eq, PartialEq, Clone, Serialize, Deserialize)]
|
||||
pub enum ChangeOption {
|
||||
TTL(Duration),
|
||||
}
|
||||
|
||||
impl TryFrom<&ChangeTableOption> for ChangeOption {
|
||||
type Error = MetadataError;
|
||||
|
||||
fn try_from(value: &ChangeTableOption) -> std::result::Result<Self, Self::Error> {
|
||||
let ChangeTableOption { key, value } = value;
|
||||
if key == TTL_KEY {
|
||||
let ttl = if value.is_empty() {
|
||||
Duration::from_secs(0)
|
||||
} else {
|
||||
humantime::parse_duration(value)
|
||||
.map_err(|_| InvalidRegionOptionChangeRequestSnafu { key, value }.build())?
|
||||
};
|
||||
Ok(Self::TTL(ttl))
|
||||
} else {
|
||||
InvalidRegionOptionChangeRequestSnafu { key, value }.fail()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Default)]
|
||||
pub struct RegionFlushRequest {
|
||||
pub row_group_size: Option<usize>,
|
||||
|
||||
@@ -137,6 +137,9 @@ pub enum Error {
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Table options value is not valid, key: `{}`, value: `{}`", key, value))]
|
||||
InvalidTableOptionValue { key: String, value: String },
|
||||
}
|
||||
|
||||
impl ErrorExt for Error {
|
||||
@@ -157,6 +160,7 @@ impl ErrorExt for Error {
|
||||
Error::Unsupported { .. } => StatusCode::Unsupported,
|
||||
Error::ParseTableOption { .. } => StatusCode::InvalidArguments,
|
||||
Error::MissingTimeIndexColumn { .. } => StatusCode::IllegalState,
|
||||
Error::InvalidTableOptionValue { .. } => StatusCode::InvalidArguments,
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -24,6 +24,7 @@ use datatypes::schema::{ColumnSchema, RawSchema, Schema, SchemaBuilder, SchemaRe
|
||||
use derive_builder::Builder;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use snafu::{ensure, OptionExt, ResultExt};
|
||||
use store_api::region_request::ChangeOption;
|
||||
use store_api::storage::{ColumnDescriptor, ColumnDescriptorBuilder, ColumnId, RegionId};
|
||||
|
||||
use crate::error::{self, Result};
|
||||
@@ -209,9 +210,35 @@ impl TableMeta {
|
||||
.next_column_id(self.next_column_id);
|
||||
Ok(meta_builder)
|
||||
}
|
||||
AlterKind::ChangeTableOptions { options } => self.change_table_options(options),
|
||||
}
|
||||
}
|
||||
|
||||
/// Creates a [TableMetaBuilder] with modified table options.
|
||||
fn change_table_options(&self, requests: &[ChangeOption]) -> Result<TableMetaBuilder> {
|
||||
let mut new_options = self.options.clone();
|
||||
|
||||
for request in requests {
|
||||
match request {
|
||||
ChangeOption::TTL(new_ttl) => {
|
||||
if new_ttl.is_zero() {
|
||||
new_options.ttl = None;
|
||||
} else {
|
||||
new_options.ttl = Some(*new_ttl);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
let mut builder = TableMetaBuilder::default();
|
||||
builder
|
||||
.options(new_options)
|
||||
.schema(self.schema.clone())
|
||||
.primary_key_indices(self.primary_key_indices.clone())
|
||||
.engine(self.engine.clone())
|
||||
.next_column_id(self.next_column_id);
|
||||
Ok(builder)
|
||||
}
|
||||
|
||||
/// Allocate a new column for the table.
|
||||
///
|
||||
/// This method would bump the `next_column_id` of the meta.
|
||||
@@ -823,6 +850,13 @@ impl RawTableInfo {
|
||||
self.meta.primary_key_indices = primary_key_indices;
|
||||
self.meta.value_indices = value_indices;
|
||||
}
|
||||
|
||||
/// Extracts region options from table info.
|
||||
///
|
||||
/// All "region options" are actually a copy of table options for redundancy.
|
||||
pub fn to_region_options(&self) -> HashMap<String, String> {
|
||||
HashMap::from(&self.meta.options)
|
||||
}
|
||||
}
|
||||
|
||||
impl From<TableInfo> for RawTableInfo {
|
||||
@@ -857,7 +891,6 @@ impl TryFrom<RawTableInfo> for TableInfo {
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
|
||||
use common_error::ext::ErrorExt;
|
||||
use common_error::status_code::StatusCode;
|
||||
use datatypes::data_type::ConcreteDataType;
|
||||
|
||||
@@ -30,6 +30,7 @@ use greptime_proto::v1::region::compact_request;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use store_api::metric_engine_consts::{LOGICAL_TABLE_METADATA_KEY, PHYSICAL_TABLE_METADATA_KEY};
|
||||
use store_api::mito_engine_options::is_mito_engine_option_key;
|
||||
use store_api::region_request::ChangeOption;
|
||||
|
||||
use crate::error::{ParseTableOptionSnafu, Result};
|
||||
use crate::metadata::{TableId, TableVersion};
|
||||
@@ -80,7 +81,7 @@ pub struct TableOptions {
|
||||
}
|
||||
|
||||
pub const WRITE_BUFFER_SIZE_KEY: &str = "write_buffer_size";
|
||||
pub const TTL_KEY: &str = "ttl";
|
||||
pub const TTL_KEY: &str = store_api::mito_engine_options::TTL_KEY;
|
||||
pub const STORAGE_KEY: &str = "storage";
|
||||
pub const COMMENT_KEY: &str = "comment";
|
||||
pub const AUTO_CREATE_TABLE_KEY: &str = "auto_create_table";
|
||||
@@ -212,8 +213,35 @@ pub enum AlterKind {
|
||||
RenameTable {
|
||||
new_table_name: String,
|
||||
},
|
||||
ChangeTableOptions {
|
||||
options: Vec<ChangeOption>,
|
||||
},
|
||||
}
|
||||
|
||||
// #[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
// pub enum ChangeTableOptionRequest {
|
||||
// TTL(Duration),
|
||||
// }
|
||||
|
||||
// impl TryFrom<&ChangeTableOption> for ChangeTableOptionRequest {
|
||||
// type Error = Error;
|
||||
//
|
||||
// fn try_from(value: &ChangeTableOption) -> std::result::Result<Self, Self::Error> {
|
||||
// let ChangeTableOption { key, value } = value;
|
||||
// if key == TTL_KEY {
|
||||
// let ttl = if value.is_empty() {
|
||||
// Duration::from_secs(0)
|
||||
// } else {
|
||||
// humantime::parse_duration(value)
|
||||
// .map_err(|_| error::InvalidTableOptionValueSnafu { key, value }.build())?
|
||||
// };
|
||||
// Ok(Self::TTL(ttl))
|
||||
// } else {
|
||||
// UnsupportedTableOptionChangeSnafu { key }.fail()
|
||||
// }
|
||||
// }
|
||||
// }
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct InsertRequest {
|
||||
pub catalog_name: String,
|
||||
|
||||
@@ -0,0 +1,88 @@
|
||||
CREATE TABLE ato(i INTEGER, j TIMESTAMP TIME INDEX);
|
||||
|
||||
Affected Rows: 0
|
||||
|
||||
ALTER TABLE ato SET 'ttl'='1d';
|
||||
|
||||
Affected Rows: 0
|
||||
|
||||
SHOW CREATE TABLE ato;
|
||||
|
||||
+-------+------------------------------------+
|
||||
| Table | Create Table |
|
||||
+-------+------------------------------------+
|
||||
| ato | CREATE TABLE IF NOT EXISTS "ato" ( |
|
||||
| | "i" INT NULL, |
|
||||
| | "j" TIMESTAMP(3) NOT NULL, |
|
||||
| | TIME INDEX ("j") |
|
||||
| | ) |
|
||||
| | |
|
||||
| | ENGINE=mito |
|
||||
| | WITH( |
|
||||
| | ttl = '1day' |
|
||||
| | ) |
|
||||
+-------+------------------------------------+
|
||||
|
||||
ALTER TABLE ato SET 'ttl'='2d';
|
||||
|
||||
Affected Rows: 0
|
||||
|
||||
SHOW CREATE TABLE ato;
|
||||
|
||||
+-------+------------------------------------+
|
||||
| Table | Create Table |
|
||||
+-------+------------------------------------+
|
||||
| ato | CREATE TABLE IF NOT EXISTS "ato" ( |
|
||||
| | "i" INT NULL, |
|
||||
| | "j" TIMESTAMP(3) NOT NULL, |
|
||||
| | TIME INDEX ("j") |
|
||||
| | ) |
|
||||
| | |
|
||||
| | ENGINE=mito |
|
||||
| | WITH( |
|
||||
| | ttl = '2days' |
|
||||
| | ) |
|
||||
+-------+------------------------------------+
|
||||
|
||||
ALTER TABLE ato SET 'ttl'=NULL;
|
||||
|
||||
Affected Rows: 0
|
||||
|
||||
SHOW CREATE TABLE ato;
|
||||
|
||||
+-------+------------------------------------+
|
||||
| Table | Create Table |
|
||||
+-------+------------------------------------+
|
||||
| ato | CREATE TABLE IF NOT EXISTS "ato" ( |
|
||||
| | "i" INT NULL, |
|
||||
| | "j" TIMESTAMP(3) NOT NULL, |
|
||||
| | TIME INDEX ("j") |
|
||||
| | ) |
|
||||
| | |
|
||||
| | ENGINE=mito |
|
||||
| | |
|
||||
+-------+------------------------------------+
|
||||
|
||||
ALTER TABLE ato SET 'ttl'='0d';
|
||||
|
||||
Affected Rows: 0
|
||||
|
||||
SHOW CREATE TABLE ato;
|
||||
|
||||
+-------+------------------------------------+
|
||||
| Table | Create Table |
|
||||
+-------+------------------------------------+
|
||||
| ato | CREATE TABLE IF NOT EXISTS "ato" ( |
|
||||
| | "i" INT NULL, |
|
||||
| | "j" TIMESTAMP(3) NOT NULL, |
|
||||
| | TIME INDEX ("j") |
|
||||
| | ) |
|
||||
| | |
|
||||
| | ENGINE=mito |
|
||||
| | |
|
||||
+-------+------------------------------------+
|
||||
|
||||
DROP TABLE ato;
|
||||
|
||||
Affected Rows: 0
|
||||
|
||||
19
tests/cases/standalone/common/alter/alter_table_options.sql
Normal file
19
tests/cases/standalone/common/alter/alter_table_options.sql
Normal file
@@ -0,0 +1,19 @@
|
||||
CREATE TABLE ato(i INTEGER, j TIMESTAMP TIME INDEX);
|
||||
|
||||
ALTER TABLE ato SET 'ttl'='1d';
|
||||
|
||||
SHOW CREATE TABLE ato;
|
||||
|
||||
ALTER TABLE ato SET 'ttl'='2d';
|
||||
|
||||
SHOW CREATE TABLE ato;
|
||||
|
||||
ALTER TABLE ato SET 'ttl'=NULL;
|
||||
|
||||
SHOW CREATE TABLE ato;
|
||||
|
||||
ALTER TABLE ato SET 'ttl'='0d';
|
||||
|
||||
SHOW CREATE TABLE ato;
|
||||
|
||||
DROP TABLE ato;
|
||||
@@ -6,23 +6,23 @@ INSERT INTO test VALUES (1, 1, 1, false), (2, 2, 2, true);
|
||||
|
||||
Affected Rows: 2
|
||||
|
||||
ALTER TABLE test MODIFY "I" STRING;
|
||||
ALTER TABLE test MODIFY COLUMN "I" STRING;
|
||||
|
||||
Error: 4002(TableColumnNotFound), Column I not exists in table test
|
||||
|
||||
ALTER TABLE test MODIFY k DATE;
|
||||
ALTER TABLE test MODIFY COLUMN k DATE;
|
||||
|
||||
Error: 1004(InvalidArguments), Invalid alter table(test) request: column 'k' cannot be cast automatically to type 'Date'
|
||||
|
||||
ALTER TABLE test MODIFY id STRING;
|
||||
ALTER TABLE test MODIFY COLUMN id STRING;
|
||||
|
||||
Error: 1004(InvalidArguments), Invalid alter table(test) request: Not allowed to change primary key index column 'id'
|
||||
|
||||
ALTER TABLE test MODIFY j STRING;
|
||||
ALTER TABLE test MODIFY COLUMN j STRING;
|
||||
|
||||
Error: 1004(InvalidArguments), Invalid alter table(test) request: Not allowed to change timestamp index column 'j' datatype
|
||||
|
||||
ALTER TABLE test MODIFY I STRING;
|
||||
ALTER TABLE test MODIFY COLUMN I STRING;
|
||||
|
||||
Affected Rows: 0
|
||||
|
||||
@@ -61,7 +61,7 @@ DESCRIBE test;
|
||||
| k | Boolean | | YES | | FIELD |
|
||||
+--------+----------------------+-----+------+---------+---------------+
|
||||
|
||||
ALTER TABLE test MODIFY I INTEGER;
|
||||
ALTER TABLE test MODIFY COLUMN I INTEGER;
|
||||
|
||||
Affected Rows: 0
|
||||
|
||||
|
||||
@@ -2,15 +2,15 @@ CREATE TABLE test(id INTEGER PRIMARY KEY, i INTEGER NULL, j TIMESTAMP TIME INDEX
|
||||
|
||||
INSERT INTO test VALUES (1, 1, 1, false), (2, 2, 2, true);
|
||||
|
||||
ALTER TABLE test MODIFY "I" STRING;
|
||||
ALTER TABLE test MODIFY COLUMN "I" STRING;
|
||||
|
||||
ALTER TABLE test MODIFY k DATE;
|
||||
ALTER TABLE test MODIFY COLUMN k DATE;
|
||||
|
||||
ALTER TABLE test MODIFY id STRING;
|
||||
ALTER TABLE test MODIFY COLUMN id STRING;
|
||||
|
||||
ALTER TABLE test MODIFY j STRING;
|
||||
ALTER TABLE test MODIFY COLUMN j STRING;
|
||||
|
||||
ALTER TABLE test MODIFY I STRING;
|
||||
ALTER TABLE test MODIFY COLUMN I STRING;
|
||||
|
||||
SELECT * FROM test;
|
||||
|
||||
@@ -21,7 +21,7 @@ SELECT * FROM test;
|
||||
|
||||
DESCRIBE test;
|
||||
|
||||
ALTER TABLE test MODIFY I INTEGER;
|
||||
ALTER TABLE test MODIFY COLUMN I INTEGER;
|
||||
|
||||
-- SQLNESS SORT_RESULT 3 1
|
||||
SELECT * FROM test;
|
||||
|
||||
@@ -15,7 +15,7 @@ SELECT * FROM test;
|
||||
| 1970-01-01T00:00:00.002 | 2 |
|
||||
+-------------------------+---+
|
||||
|
||||
ALTER TABLE test MODIFY j STRING;
|
||||
ALTER TABLE test MODIFY COLUMN j STRING;
|
||||
|
||||
Error: 1004(InvalidArguments), Invalid alter table(test) request: column 'j' must be nullable to ensure safe conversion.
|
||||
|
||||
|
||||
@@ -4,7 +4,7 @@ INSERT INTO test VALUES (1, 1), (2, 2);
|
||||
|
||||
SELECT * FROM test;
|
||||
|
||||
ALTER TABLE test MODIFY j STRING;
|
||||
ALTER TABLE test MODIFY COLUMN j STRING;
|
||||
|
||||
SELECT * FROM test;
|
||||
|
||||
|
||||
Reference in New Issue
Block a user