mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-05-31 04:10:38 +00:00
feat: support add if not exists in the gRPC alter kind (#5273)
* test: test adding existing columns * chore: add more checks to AlterKind * chore: update logs * fix: check and build table info first * feat: Add add_if_not_exists flag to alter expr * feat: skip existing columns when building alter kind * checks in make_region_alter_kind() * reuse the alter kind * test: fix tests in common-meta * chore: fix typos * chore: update comments
This commit is contained in:
@@ -60,6 +60,7 @@ pub fn alter_expr_to_request(table_id: TableId, expr: AlterTableExpr) -> Result<
|
||||
column_schema: schema,
|
||||
is_key: column_def.semantic_type == SemanticType::Tag as i32,
|
||||
location: parse_location(ac.location)?,
|
||||
add_if_not_exists: ac.add_if_not_exists,
|
||||
})
|
||||
})
|
||||
.collect::<Result<Vec<_>>>()?;
|
||||
@@ -220,6 +221,7 @@ mod tests {
|
||||
..Default::default()
|
||||
}),
|
||||
location: None,
|
||||
add_if_not_exists: true,
|
||||
}],
|
||||
})),
|
||||
};
|
||||
@@ -240,6 +242,7 @@ mod tests {
|
||||
add_column.column_schema.data_type
|
||||
);
|
||||
assert_eq!(None, add_column.location);
|
||||
assert!(add_column.add_if_not_exists);
|
||||
}
|
||||
|
||||
#[test]
|
||||
@@ -265,6 +268,7 @@ mod tests {
|
||||
location_type: LocationType::First.into(),
|
||||
after_column_name: String::default(),
|
||||
}),
|
||||
add_if_not_exists: false,
|
||||
},
|
||||
AddColumn {
|
||||
column_def: Some(ColumnDef {
|
||||
@@ -280,6 +284,7 @@ mod tests {
|
||||
location_type: LocationType::After.into(),
|
||||
after_column_name: "ts".to_string(),
|
||||
}),
|
||||
add_if_not_exists: true,
|
||||
},
|
||||
],
|
||||
})),
|
||||
@@ -308,6 +313,7 @@ mod tests {
|
||||
}),
|
||||
add_column.location
|
||||
);
|
||||
assert!(add_column.add_if_not_exists);
|
||||
|
||||
let add_column = add_columns.pop().unwrap();
|
||||
assert!(!add_column.is_key);
|
||||
@@ -317,6 +323,7 @@ mod tests {
|
||||
add_column.column_schema.data_type
|
||||
);
|
||||
assert_eq!(Some(AddColumnLocation::First), add_column.location);
|
||||
assert!(!add_column.add_if_not_exists);
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
||||
@@ -299,6 +299,7 @@ mod tests {
|
||||
.unwrap()
|
||||
)
|
||||
);
|
||||
assert!(host_column.add_if_not_exists);
|
||||
|
||||
let memory_column = &add_columns.add_columns[1];
|
||||
assert_eq!(
|
||||
@@ -311,6 +312,7 @@ mod tests {
|
||||
.unwrap()
|
||||
)
|
||||
);
|
||||
assert!(host_column.add_if_not_exists);
|
||||
|
||||
let time_column = &add_columns.add_columns[2];
|
||||
assert_eq!(
|
||||
@@ -323,6 +325,7 @@ mod tests {
|
||||
.unwrap()
|
||||
)
|
||||
);
|
||||
assert!(host_column.add_if_not_exists);
|
||||
|
||||
let interval_column = &add_columns.add_columns[3];
|
||||
assert_eq!(
|
||||
@@ -335,6 +338,7 @@ mod tests {
|
||||
.unwrap()
|
||||
)
|
||||
);
|
||||
assert!(host_column.add_if_not_exists);
|
||||
|
||||
let decimal_column = &add_columns.add_columns[4];
|
||||
assert_eq!(
|
||||
@@ -352,6 +356,7 @@ mod tests {
|
||||
.unwrap()
|
||||
)
|
||||
);
|
||||
assert!(host_column.add_if_not_exists);
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
||||
@@ -192,6 +192,9 @@ pub fn build_create_table_expr(
|
||||
Ok(expr)
|
||||
}
|
||||
|
||||
/// Find columns that are not present in the schema and return them as `AddColumns`
|
||||
/// for adding columns automatically.
|
||||
/// It always sets `add_if_not_exists` to `true` for now.
|
||||
pub fn extract_new_columns(
|
||||
schema: &Schema,
|
||||
column_exprs: Vec<ColumnExpr>,
|
||||
@@ -213,6 +216,7 @@ pub fn extract_new_columns(
|
||||
AddColumn {
|
||||
column_def,
|
||||
location: None,
|
||||
add_if_not_exists: true,
|
||||
}
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
@@ -105,7 +105,7 @@ impl AlterLogicalTablesProcedure {
|
||||
.context(ConvertAlterTableRequestSnafu)?;
|
||||
let new_meta = table_info
|
||||
.meta
|
||||
.builder_with_alter_kind(table_ref.table, &request.alter_kind, true)
|
||||
.builder_with_alter_kind(table_ref.table, &request.alter_kind)
|
||||
.context(error::TableSnafu)?
|
||||
.build()
|
||||
.with_context(|_| error::BuildTableMetaSnafu {
|
||||
|
||||
@@ -28,13 +28,13 @@ use common_procedure::error::{FromJsonSnafu, Result as ProcedureResult, ToJsonSn
|
||||
use common_procedure::{
|
||||
Context as ProcedureContext, Error as ProcedureError, LockKey, Procedure, Status, StringKey,
|
||||
};
|
||||
use common_telemetry::{debug, info};
|
||||
use common_telemetry::{debug, error, info};
|
||||
use futures::future;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use snafu::ResultExt;
|
||||
use store_api::storage::RegionId;
|
||||
use strum::AsRefStr;
|
||||
use table::metadata::{RawTableInfo, TableId};
|
||||
use table::metadata::{RawTableInfo, TableId, TableInfo};
|
||||
use table::table_reference::TableReference;
|
||||
|
||||
use crate::cache_invalidator::Context;
|
||||
@@ -51,10 +51,14 @@ use crate::{metrics, ClusterId};
|
||||
|
||||
/// The alter table procedure
|
||||
pub struct AlterTableProcedure {
|
||||
// The runtime context.
|
||||
/// The runtime context.
|
||||
context: DdlContext,
|
||||
// The serialized data.
|
||||
/// The serialized data.
|
||||
data: AlterTableData,
|
||||
/// Cached new table metadata in the prepare step.
|
||||
/// If we recover the procedure from json, then the table info value is not cached.
|
||||
/// But we already validated it in the prepare step.
|
||||
new_table_info: Option<TableInfo>,
|
||||
}
|
||||
|
||||
impl AlterTableProcedure {
|
||||
@@ -70,18 +74,31 @@ impl AlterTableProcedure {
|
||||
Ok(Self {
|
||||
context,
|
||||
data: AlterTableData::new(task, table_id, cluster_id),
|
||||
new_table_info: None,
|
||||
})
|
||||
}
|
||||
|
||||
pub fn from_json(json: &str, context: DdlContext) -> ProcedureResult<Self> {
|
||||
let data: AlterTableData = serde_json::from_str(json).context(FromJsonSnafu)?;
|
||||
Ok(AlterTableProcedure { context, data })
|
||||
Ok(AlterTableProcedure {
|
||||
context,
|
||||
data,
|
||||
new_table_info: None,
|
||||
})
|
||||
}
|
||||
|
||||
// Checks whether the table exists.
|
||||
pub(crate) async fn on_prepare(&mut self) -> Result<Status> {
|
||||
self.check_alter().await?;
|
||||
self.fill_table_info().await?;
|
||||
|
||||
// Validates the request and builds the new table info.
|
||||
// We need to build the new table info here because we should ensure the alteration
|
||||
// is valid in `UpdateMeta` state as we already altered the region.
|
||||
// Safety: `fill_table_info()` already set it.
|
||||
let table_info_value = self.data.table_info_value.as_ref().unwrap();
|
||||
self.new_table_info = Some(self.build_new_table_info(&table_info_value.table_info)?);
|
||||
|
||||
// Safety: Checked in `AlterTableProcedure::new`.
|
||||
let alter_kind = self.data.task.alter_table.kind.as_ref().unwrap();
|
||||
if matches!(alter_kind, Kind::RenameTable { .. }) {
|
||||
@@ -106,6 +123,14 @@ impl AlterTableProcedure {
|
||||
|
||||
let leaders = find_leaders(&physical_table_route.region_routes);
|
||||
let mut alter_region_tasks = Vec::with_capacity(leaders.len());
|
||||
let alter_kind = self.make_region_alter_kind()?;
|
||||
|
||||
info!(
|
||||
"Submitting alter region requests for table {}, table_id: {}, alter_kind: {:?}",
|
||||
self.data.table_ref(),
|
||||
table_id,
|
||||
alter_kind,
|
||||
);
|
||||
|
||||
for datanode in leaders {
|
||||
let requester = self.context.node_manager.datanode(&datanode).await;
|
||||
@@ -113,7 +138,7 @@ impl AlterTableProcedure {
|
||||
|
||||
for region in regions {
|
||||
let region_id = RegionId::new(table_id, region);
|
||||
let request = self.make_alter_region_request(region_id)?;
|
||||
let request = self.make_alter_region_request(region_id, alter_kind.clone())?;
|
||||
debug!("Submitting {request:?} to {datanode}");
|
||||
|
||||
let datanode = datanode.clone();
|
||||
@@ -150,7 +175,15 @@ impl AlterTableProcedure {
|
||||
let table_ref = self.data.table_ref();
|
||||
// Safety: checked before.
|
||||
let table_info_value = self.data.table_info_value.as_ref().unwrap();
|
||||
let new_info = self.build_new_table_info(&table_info_value.table_info)?;
|
||||
// Gets the table info from the cache or builds it.
|
||||
let new_info = match &self.new_table_info {
|
||||
Some(cached) => cached.clone(),
|
||||
None => self.build_new_table_info(&table_info_value.table_info)
|
||||
.inspect_err(|e| {
|
||||
// We already check the table info in the prepare step so this should not happen.
|
||||
error!(e; "Unable to build info for table {} in update metadata step, table_id: {}", table_ref, table_id);
|
||||
})?,
|
||||
};
|
||||
|
||||
debug!(
|
||||
"Starting update table: {} metadata, new table info {:?}",
|
||||
@@ -174,7 +207,7 @@ impl AlterTableProcedure {
|
||||
.await?;
|
||||
}
|
||||
|
||||
info!("Updated table metadata for table {table_ref}, table_id: {table_id}");
|
||||
info!("Updated table metadata for table {table_ref}, table_id: {table_id}, kind: {alter_kind:?}");
|
||||
self.data.state = AlterTableState::InvalidateTableCache;
|
||||
Ok(Status::executing(true))
|
||||
}
|
||||
|
||||
@@ -12,6 +12,8 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::collections::HashSet;
|
||||
|
||||
use api::v1::alter_table_expr::Kind;
|
||||
use api::v1::region::region_request::Body;
|
||||
use api::v1::region::{
|
||||
@@ -27,13 +29,15 @@ use crate::ddl::alter_table::AlterTableProcedure;
|
||||
use crate::error::{InvalidProtoMsgSnafu, Result};
|
||||
|
||||
impl AlterTableProcedure {
|
||||
/// Makes alter region request.
|
||||
pub(crate) fn make_alter_region_request(&self, region_id: RegionId) -> Result<RegionRequest> {
|
||||
// Safety: Checked in `AlterTableProcedure::new`.
|
||||
let alter_kind = self.data.task.alter_table.kind.as_ref().unwrap();
|
||||
/// Makes alter region request from existing an alter kind.
|
||||
/// Region alter request always add columns if not exist.
|
||||
pub(crate) fn make_alter_region_request(
|
||||
&self,
|
||||
region_id: RegionId,
|
||||
kind: Option<alter_request::Kind>,
|
||||
) -> Result<RegionRequest> {
|
||||
// Safety: checked
|
||||
let table_info = self.data.table_info().unwrap();
|
||||
let kind = create_proto_alter_kind(table_info, alter_kind)?;
|
||||
|
||||
Ok(RegionRequest {
|
||||
header: Some(RegionRequestHeader {
|
||||
@@ -47,45 +51,66 @@ impl AlterTableProcedure {
|
||||
})),
|
||||
})
|
||||
}
|
||||
|
||||
/// Makes alter kind proto that all regions can reuse.
|
||||
/// Region alter request always add columns if not exist.
|
||||
pub(crate) fn make_region_alter_kind(&self) -> Result<Option<alter_request::Kind>> {
|
||||
// Safety: Checked in `AlterTableProcedure::new`.
|
||||
let alter_kind = self.data.task.alter_table.kind.as_ref().unwrap();
|
||||
// Safety: checked
|
||||
let table_info = self.data.table_info().unwrap();
|
||||
let kind = create_proto_alter_kind(table_info, alter_kind)?;
|
||||
|
||||
Ok(kind)
|
||||
}
|
||||
}
|
||||
|
||||
/// Creates region proto alter kind from `table_info` and `alter_kind`.
|
||||
///
|
||||
/// Returns the kind and next column id if it adds new columns.
|
||||
/// It always adds column if not exists and drops column if exists.
|
||||
/// It skips the column if it already exists in the table.
|
||||
fn create_proto_alter_kind(
|
||||
table_info: &RawTableInfo,
|
||||
alter_kind: &Kind,
|
||||
) -> Result<Option<alter_request::Kind>> {
|
||||
match alter_kind {
|
||||
Kind::AddColumns(x) => {
|
||||
// Construct a set of existing columns in the table.
|
||||
let existing_columns: HashSet<_> = table_info
|
||||
.meta
|
||||
.schema
|
||||
.column_schemas
|
||||
.iter()
|
||||
.map(|col| &col.name)
|
||||
.collect();
|
||||
let mut next_column_id = table_info.meta.next_column_id;
|
||||
|
||||
let add_columns = x
|
||||
.add_columns
|
||||
.iter()
|
||||
.map(|add_column| {
|
||||
let column_def =
|
||||
add_column
|
||||
.column_def
|
||||
.as_ref()
|
||||
.context(InvalidProtoMsgSnafu {
|
||||
err_msg: "'column_def' is absent",
|
||||
})?;
|
||||
let mut add_columns = Vec::with_capacity(x.add_columns.len());
|
||||
for add_column in &x.add_columns {
|
||||
let column_def = add_column
|
||||
.column_def
|
||||
.as_ref()
|
||||
.context(InvalidProtoMsgSnafu {
|
||||
err_msg: "'column_def' is absent",
|
||||
})?;
|
||||
|
||||
let column_id = next_column_id;
|
||||
next_column_id += 1;
|
||||
// Skips existing columns.
|
||||
if existing_columns.contains(&column_def.name) {
|
||||
continue;
|
||||
}
|
||||
|
||||
let column_def = RegionColumnDef {
|
||||
column_def: Some(column_def.clone()),
|
||||
column_id,
|
||||
};
|
||||
let column_id = next_column_id;
|
||||
next_column_id += 1;
|
||||
let column_def = RegionColumnDef {
|
||||
column_def: Some(column_def.clone()),
|
||||
column_id,
|
||||
};
|
||||
|
||||
Ok(AddColumn {
|
||||
column_def: Some(column_def),
|
||||
location: add_column.location.clone(),
|
||||
})
|
||||
})
|
||||
.collect::<Result<Vec<_>>>()?;
|
||||
add_columns.push(AddColumn {
|
||||
column_def: Some(column_def),
|
||||
location: add_column.location.clone(),
|
||||
});
|
||||
}
|
||||
|
||||
Ok(Some(alter_request::Kind::AddColumns(AddColumns {
|
||||
add_columns,
|
||||
@@ -143,6 +168,7 @@ mod tests {
|
||||
use crate::rpc::router::{Region, RegionRoute};
|
||||
use crate::test_util::{new_ddl_context, MockDatanodeManager};
|
||||
|
||||
/// Prepares a region with schema `[ts: Timestamp, host: Tag, cpu: Field]`.
|
||||
async fn prepare_ddl_context() -> (DdlContext, u64, TableId, RegionId, String) {
|
||||
let datanode_manager = Arc::new(MockDatanodeManager::new(()));
|
||||
let ddl_context = new_ddl_context(datanode_manager);
|
||||
@@ -171,6 +197,7 @@ mod tests {
|
||||
.name("cpu")
|
||||
.data_type(ColumnDataType::Float64)
|
||||
.semantic_type(SemanticType::Field)
|
||||
.is_nullable(true)
|
||||
.build()
|
||||
.unwrap()
|
||||
.into(),
|
||||
@@ -225,15 +252,16 @@ mod tests {
|
||||
name: "my_tag3".to_string(),
|
||||
data_type: ColumnDataType::String as i32,
|
||||
is_nullable: true,
|
||||
default_constraint: b"hello".to_vec(),
|
||||
default_constraint: Vec::new(),
|
||||
semantic_type: SemanticType::Tag as i32,
|
||||
comment: String::new(),
|
||||
..Default::default()
|
||||
}),
|
||||
location: Some(AddColumnLocation {
|
||||
location_type: LocationType::After as i32,
|
||||
after_column_name: "my_tag2".to_string(),
|
||||
after_column_name: "host".to_string(),
|
||||
}),
|
||||
add_if_not_exists: false,
|
||||
}],
|
||||
})),
|
||||
},
|
||||
@@ -242,8 +270,11 @@ mod tests {
|
||||
let mut procedure =
|
||||
AlterTableProcedure::new(cluster_id, table_id, task, ddl_context).unwrap();
|
||||
procedure.on_prepare().await.unwrap();
|
||||
let Some(Body::Alter(alter_region_request)) =
|
||||
procedure.make_alter_region_request(region_id).unwrap().body
|
||||
let alter_kind = procedure.make_region_alter_kind().unwrap();
|
||||
let Some(Body::Alter(alter_region_request)) = procedure
|
||||
.make_alter_region_request(region_id, alter_kind)
|
||||
.unwrap()
|
||||
.body
|
||||
else {
|
||||
unreachable!()
|
||||
};
|
||||
@@ -259,7 +290,7 @@ mod tests {
|
||||
name: "my_tag3".to_string(),
|
||||
data_type: ColumnDataType::String as i32,
|
||||
is_nullable: true,
|
||||
default_constraint: b"hello".to_vec(),
|
||||
default_constraint: Vec::new(),
|
||||
semantic_type: SemanticType::Tag as i32,
|
||||
comment: String::new(),
|
||||
..Default::default()
|
||||
@@ -268,7 +299,7 @@ mod tests {
|
||||
}),
|
||||
location: Some(AddColumnLocation {
|
||||
location_type: LocationType::After as i32,
|
||||
after_column_name: "my_tag2".to_string(),
|
||||
after_column_name: "host".to_string(),
|
||||
}),
|
||||
}]
|
||||
}
|
||||
@@ -299,8 +330,11 @@ mod tests {
|
||||
let mut procedure =
|
||||
AlterTableProcedure::new(cluster_id, table_id, task, ddl_context).unwrap();
|
||||
procedure.on_prepare().await.unwrap();
|
||||
let Some(Body::Alter(alter_region_request)) =
|
||||
procedure.make_alter_region_request(region_id).unwrap().body
|
||||
let alter_kind = procedure.make_region_alter_kind().unwrap();
|
||||
let Some(Body::Alter(alter_region_request)) = procedure
|
||||
.make_alter_region_request(region_id, alter_kind)
|
||||
.unwrap()
|
||||
.body
|
||||
else {
|
||||
unreachable!()
|
||||
};
|
||||
|
||||
@@ -23,7 +23,9 @@ use crate::key::table_info::TableInfoValue;
|
||||
use crate::key::{DeserializedValueWithBytes, RegionDistribution};
|
||||
|
||||
impl AlterTableProcedure {
|
||||
/// Builds new_meta
|
||||
/// Builds new table info after alteration.
|
||||
/// It bumps the column id of the table by the number of the add column requests.
|
||||
/// So there may be holes in the column id sequence.
|
||||
pub(crate) fn build_new_table_info(&self, table_info: &RawTableInfo) -> Result<TableInfo> {
|
||||
let table_info =
|
||||
TableInfo::try_from(table_info.clone()).context(error::ConvertRawTableInfoSnafu)?;
|
||||
@@ -34,7 +36,7 @@ impl AlterTableProcedure {
|
||||
|
||||
let new_meta = table_info
|
||||
.meta
|
||||
.builder_with_alter_kind(table_ref.table, &request.alter_kind, false)
|
||||
.builder_with_alter_kind(table_ref.table, &request.alter_kind)
|
||||
.context(error::TableSnafu)?
|
||||
.build()
|
||||
.with_context(|_| error::BuildTableMetaSnafu {
|
||||
@@ -46,6 +48,9 @@ impl AlterTableProcedure {
|
||||
new_info.ident.version = table_info.ident.version + 1;
|
||||
match request.alter_kind {
|
||||
AlterKind::AddColumns { columns } => {
|
||||
// Bumps the column id for the new columns.
|
||||
// It may bump more than the actual number of columns added if there are
|
||||
// existing columns, but it's fine.
|
||||
new_info.meta.next_column_id += columns.len() as u32;
|
||||
}
|
||||
AlterKind::RenameTable { new_table_name } => {
|
||||
|
||||
@@ -30,6 +30,8 @@ pub struct TestAlterTableExpr {
|
||||
add_columns: Vec<ColumnDef>,
|
||||
#[builder(setter(into, strip_option))]
|
||||
new_table_name: Option<String>,
|
||||
#[builder(setter)]
|
||||
add_if_not_exists: bool,
|
||||
}
|
||||
|
||||
impl From<TestAlterTableExpr> for AlterTableExpr {
|
||||
@@ -53,6 +55,7 @@ impl From<TestAlterTableExpr> for AlterTableExpr {
|
||||
.map(|col| AddColumn {
|
||||
column_def: Some(col),
|
||||
location: None,
|
||||
add_if_not_exists: value.add_if_not_exists,
|
||||
})
|
||||
.collect(),
|
||||
})),
|
||||
|
||||
@@ -56,6 +56,7 @@ fn make_alter_logical_table_add_column_task(
|
||||
let alter_table = alter_table
|
||||
.table_name(table.to_string())
|
||||
.add_columns(add_columns)
|
||||
.add_if_not_exists(true)
|
||||
.build()
|
||||
.unwrap();
|
||||
|
||||
|
||||
@@ -139,7 +139,7 @@ async fn test_on_submit_alter_request() {
|
||||
table_name: table_name.to_string(),
|
||||
kind: Some(Kind::DropColumns(DropColumns {
|
||||
drop_columns: vec![DropColumn {
|
||||
name: "my_field_column".to_string(),
|
||||
name: "cpu".to_string(),
|
||||
}],
|
||||
})),
|
||||
},
|
||||
@@ -225,7 +225,7 @@ async fn test_on_submit_alter_request_with_outdated_request() {
|
||||
table_name: table_name.to_string(),
|
||||
kind: Some(Kind::DropColumns(DropColumns {
|
||||
drop_columns: vec![DropColumn {
|
||||
name: "my_field_column".to_string(),
|
||||
name: "cpu".to_string(),
|
||||
}],
|
||||
})),
|
||||
},
|
||||
@@ -330,6 +330,7 @@ async fn test_on_update_metadata_add_columns() {
|
||||
..Default::default()
|
||||
}),
|
||||
location: None,
|
||||
add_if_not_exists: false,
|
||||
}],
|
||||
})),
|
||||
},
|
||||
|
||||
Reference in New Issue
Block a user