fix: fix alter table verification (#2437)

* fix: fix verify alter

* refactor: move AlterTable UpdateMetadata to last step

* refactor: send region request in parallel

* Update src/table/src/metadata.rs

Co-authored-by: LFC <990479+MichaelScofield@users.noreply.github.com>

* Update src/table/src/metadata.rs

Co-authored-by: JeremyHi <jiachun_feng@proton.me>

---------

Co-authored-by: LFC <990479+MichaelScofield@users.noreply.github.com>
Co-authored-by: JeremyHi <jiachun_feng@proton.me>
This commit is contained in:
Weny Xu
2023-09-19 22:40:48 +09:00
committed by GitHub
parent 0f79ccab31
commit 339e12c64a
6 changed files with 148 additions and 68 deletions

View File

@@ -117,9 +117,10 @@ impl AlterTableProcedure {
let catalog = &alter_expr.catalog_name;
let schema = &alter_expr.schema_name;
let alter_kind = self.alter_kind()?;
let manager = &self.context.table_metadata_manager;
if let Kind::RenameTable(RenameTable { new_table_name }) = self.alter_kind()? {
if let Kind::RenameTable(RenameTable { new_table_name }) = alter_kind {
let new_table_name_key = TableNameKey::new(catalog, schema, new_table_name);
let exist = manager
@@ -146,7 +147,11 @@ impl AlterTableProcedure {
}
);
self.data.state = AlterTableState::UpdateMetadata;
if matches!(alter_kind, Kind::RenameTable { .. }) {
self.data.state = AlterTableState::UpdateMetadata;
} else {
self.data.state = AlterTableState::SubmitAlterRegionRequests;
};
Ok(Status::executing(true))
}
@@ -174,7 +179,7 @@ impl AlterTableProcedure {
})
}
pub async fn submit_alter_region_requests(&self) -> Result<Status> {
pub async fn submit_alter_region_requests(&mut self) -> Result<Status> {
let table_id = self.data.table_id();
let table_ref = self.data.table_ref();
@@ -192,30 +197,31 @@ impl AlterTableProcedure {
let mut alter_region_tasks = Vec::with_capacity(leaders.len());
for datanode in leaders {
let datanode_manager = self.context.datanode_manager.clone();
let requester = self.context.datanode_manager.datanode(&datanode).await;
let regions = find_leader_regions(&region_routes, &datanode);
alter_region_tasks.push(async move {
for region in regions {
let region_id = RegionId::new(table_id, region);
let request = self.create_alter_region_request(region_id)?;
let request = RegionRequest {
header: Some(RegionRequestHeader {
trace_id: common_telemetry::trace_id().unwrap_or_default(),
..Default::default()
}),
body: Some(region_request::Body::Alter(request)),
};
debug!("Submitting {request:?} to {datanode}");
for region in regions {
let region_id = RegionId::new(table_id, region);
let request = self.create_alter_region_request(region_id)?;
let request = RegionRequest {
header: Some(RegionRequestHeader {
trace_id: common_telemetry::trace_id().unwrap_or_default(),
..Default::default()
}),
body: Some(region_request::Body::Alter(request)),
};
debug!("Submitting {request:?} to {datanode}");
let requester = datanode_manager.datanode(&datanode).await;
let datanode = datanode.clone();
let requester = requester.clone();
alter_region_tasks.push(async move {
if let Err(e) = requester.handle(request).await {
return Err(handle_operate_region_error(datanode)(e));
}
}
Ok(())
});
Ok(())
});
}
}
future::join_all(alter_region_tasks)
@@ -223,7 +229,9 @@ impl AlterTableProcedure {
.into_iter()
.collect::<Result<Vec<_>>>()?;
Ok(Status::Done)
self.data.state = AlterTableState::UpdateMetadata;
Ok(Status::executing(true))
}
/// Update table metadata for rename table operation.
@@ -313,22 +321,17 @@ impl AlterTableProcedure {
let alter_kind = self.alter_kind()?;
let cache_invalidator = &self.context.cache_invalidator;
let status = if matches!(alter_kind, Kind::RenameTable { .. }) {
if matches!(alter_kind, Kind::RenameTable { .. }) {
cache_invalidator
.invalidate_table_name(&Context::default(), self.data.table_ref().into())
.await?;
Status::Done
} else {
cache_invalidator
.invalidate_table_id(&Context::default(), self.data.table_id())
.await?;
self.data.state = AlterTableState::SubmitAlterRegionRequests;
Status::executing(true)
};
Ok(status)
Ok(Status::Done)
}
fn lock_key_inner(&self) -> Vec<String> {
@@ -376,9 +379,9 @@ impl Procedure for AlterTableProcedure {
match state {
AlterTableState::Prepare => self.on_prepare().await,
AlterTableState::SubmitAlterRegionRequests => self.submit_alter_region_requests().await,
AlterTableState::UpdateMetadata => self.on_update_metadata().await,
AlterTableState::InvalidateTableCache => self.on_broadcast().await,
AlterTableState::SubmitAlterRegionRequests => self.submit_alter_region_requests().await,
}
.map_err(error_handler)
}
@@ -398,11 +401,11 @@ impl Procedure for AlterTableProcedure {
enum AlterTableState {
/// Prepares to alter the table
Prepare,
SubmitAlterRegionRequests,
/// Updates table metadata.
UpdateMetadata,
/// Broadcasts the invalidating table cache instruction.
InvalidateTableCache,
SubmitAlterRegionRequests,
}
#[derive(Debug, Serialize, Deserialize)]

View File

@@ -182,7 +182,7 @@ impl CreateTableProcedure {
let mut create_region_tasks = Vec::with_capacity(leaders.len());
for datanode in leaders {
let manager = self.context.datanode_manager.clone();
let requester = self.context.datanode_manager.datanode(&datanode).await;
let regions = find_leader_regions(region_routes, &datanode);
let requests = regions
@@ -197,23 +197,25 @@ impl CreateTableProcedure {
})
.collect::<Vec<_>>();
create_region_tasks.push(async move {
for request in requests {
let requester = manager.datanode(&datanode).await;
for request in requests {
let request = RegionRequest {
header: Some(RegionRequestHeader {
trace_id: 0,
span_id: 0,
}),
body: Some(request),
};
let request = RegionRequest {
header: Some(RegionRequestHeader {
trace_id: 0,
span_id: 0,
}),
body: Some(request),
};
let datanode = datanode.clone();
let requester = requester.clone();
create_region_tasks.push(async move {
if let Err(err) = requester.handle(request).await {
return Err(handle_operate_region_error(datanode)(err));
}
}
Ok(())
});
Ok(())
});
}
}
join_all(create_region_tasks)

View File

@@ -142,7 +142,7 @@ impl DropTableProcedure {
let mut drop_region_tasks = Vec::with_capacity(leaders.len());
for datanode in leaders {
let clients = self.context.datanode_manager.clone();
let requester = self.context.datanode_manager.datanode(&datanode).await;
let regions = find_leader_regions(region_routes, &datanode);
let region_ids = regions
@@ -150,28 +150,31 @@ impl DropTableProcedure {
.map(|region_number| RegionId::new(table_id, *region_number))
.collect::<Vec<_>>();
drop_region_tasks.push(async move {
for region_id in region_ids {
debug!("Dropping region {region_id} on Datanode {datanode:?}");
for region_id in region_ids {
debug!("Dropping region {region_id} on Datanode {datanode:?}");
let request = RegionRequest {
header: Some(RegionRequestHeader {
trace_id: 0,
span_id: 0,
}),
body: Some(region_request::Body::Drop(PbDropRegionRequest {
region_id: region_id.as_u64(),
})),
};
let request = RegionRequest {
header: Some(RegionRequestHeader {
trace_id: 0,
span_id: 0,
}),
body: Some(region_request::Body::Drop(PbDropRegionRequest {
region_id: region_id.as_u64(),
})),
};
if let Err(err) = clients.datanode(&datanode).await.handle(request).await {
let datanode = datanode.clone();
let requester = requester.clone();
drop_region_tasks.push(async move {
if let Err(err) = requester.handle(request).await {
if err.status_code() != StatusCode::RegionNotFound {
return Err(handle_operate_region_error(datanode)(err));
}
}
}
Ok(())
});
Ok(())
});
}
}
join_all(drop_region_tasks)

View File

@@ -362,7 +362,7 @@ async fn test_submit_alter_region_requests() {
.await
.unwrap();
let procedure = AlterTableProcedure::new(
let mut procedure = AlterTableProcedure::new(
1,
alter_table_task,
TableInfoValue::new(table_info),
@@ -393,7 +393,7 @@ async fn test_submit_alter_region_requests() {
});
let status = procedure.submit_alter_region_requests().await.unwrap();
assert!(matches!(status, Status::Done));
assert!(matches!(status, Status::Executing { persist: true }));
handle.await.unwrap();

View File

@@ -122,6 +122,13 @@ pub enum Error {
location: Location,
},
#[snafu(display("Invalid alter table({}) request: {}", table, err))]
InvalidAlterRequest {
table: String,
location: Location,
err: String,
},
#[snafu(display("Invalid table state: {}", table_id))]
InvalidTable {
table_id: TableId,
@@ -141,9 +148,9 @@ impl ErrorExt for Error {
Error::Datafusion { .. }
| Error::SchemaConversion { .. }
| Error::TableProjection { .. } => StatusCode::EngineExecuteQuery,
Error::RemoveColumnInIndex { .. } | Error::BuildColumnDescriptor { .. } => {
StatusCode::InvalidArguments
}
Error::RemoveColumnInIndex { .. }
| Error::BuildColumnDescriptor { .. }
| Error::InvalidAlterRequest { .. } => StatusCode::InvalidArguments,
Error::TablesRecordBatch { .. } | Error::DuplicatedExecuteCall { .. } => {
StatusCode::Unexpected
}

View File

@@ -244,6 +244,41 @@ impl TableMeta {
let original_primary_key_indices: HashSet<&usize> =
self.primary_key_indices.iter().collect();
let mut names = HashSet::with_capacity(requests.len());
for col_to_add in requests {
ensure!(
names.insert(&col_to_add.column_schema.name),
error::InvalidAlterRequestSnafu {
table: table_name,
err: format!(
"add column {} more than once",
col_to_add.column_schema.name
),
}
);
ensure!(
!table_schema.contains_column(&col_to_add.column_schema.name),
error::ColumnExistsSnafu {
table_name,
column_name: col_to_add.column_schema.name.to_string()
},
);
ensure!(
col_to_add.column_schema.is_nullable()
|| col_to_add.column_schema.default_constraint().is_some(),
error::InvalidAlterRequestSnafu {
table: table_name,
err: format!(
"no default value for column {}",
col_to_add.column_schema.name
),
},
);
}
let SplitResult {
columns_at_first,
columns_at_after,
@@ -858,6 +893,36 @@ mod tests {
assert_eq!(StatusCode::TableColumnExists, err.status_code());
}
#[test]
fn test_add_invalid_column() {
let schema = Arc::new(new_test_schema());
let meta = TableMetaBuilder::default()
.schema(schema)
.primary_key_indices(vec![0])
.engine("engine")
.next_column_id(3)
.build()
.unwrap();
let alter_kind = AlterKind::AddColumns {
columns: vec![AddColumnRequest {
column_schema: ColumnSchema::new(
"weny",
ConcreteDataType::string_datatype(),
false,
),
is_key: false,
location: None,
}],
};
let err = meta
.builder_with_alter_kind("my_table", &alter_kind)
.err()
.unwrap();
assert_eq!(StatusCode::InvalidArguments, err.status_code());
}
#[test]
fn test_remove_unknown_column() {
let schema = Arc::new(new_test_schema());