feat: batch alter logical tables (#3569)

* feat: add unit test for alter logical tables

* Update src/common/meta/src/ddl/alter_table.rs

Co-authored-by: Ruihang Xia <waynestxia@gmail.com>

* feat: add some comments

* chore: add debug_assert_eq

* chore: fix some nits

* chore: remove the method batch_get_table_routes

---------

Co-authored-by: Ruihang Xia <waynestxia@gmail.com>
This commit is contained in:
JeremyHi
2024-03-26 15:07:23 +08:00
committed by GitHub
parent 7c1c6e8b8c
commit c2dd1136fe
28 changed files with 1641 additions and 245 deletions

View File

@@ -28,6 +28,7 @@ use crate::region_keeper::MemoryRegionKeeperRef;
use crate::rpc::ddl::{SubmitDdlTaskRequest, SubmitDdlTaskResponse};
use crate::rpc::procedure::{MigrateRegionRequest, MigrateRegionResponse, ProcedureStateResponse};
pub mod alter_logical_tables;
pub mod alter_table;
pub mod create_logical_tables;
pub mod create_table;

View File

@@ -0,0 +1,253 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
mod check;
mod metadata;
mod region_request;
mod update_metadata;
use async_trait::async_trait;
use common_procedure::error::{FromJsonSnafu, Result as ProcedureResult, ToJsonSnafu};
use common_procedure::{Context, LockKey, Procedure, Status};
use futures_util::future;
use itertools::Itertools;
use serde::{Deserialize, Serialize};
use snafu::ResultExt;
use strum::AsRefStr;
use table::metadata::TableId;
use crate::ddl::utils::add_peer_context_if_needed;
use crate::ddl::DdlContext;
use crate::error::{Error, Result};
use crate::instruction::CacheIdent;
use crate::key::table_info::TableInfoValue;
use crate::key::table_route::PhysicalTableRouteValue;
use crate::lock_key::{CatalogLock, SchemaLock, TableLock, TableNameLock};
use crate::rpc::ddl::AlterTableTask;
use crate::rpc::router::{find_leader_regions, find_leaders};
use crate::{cache_invalidator, metrics, ClusterId};
pub struct AlterLogicalTablesProcedure {
pub context: DdlContext,
pub data: AlterTablesData,
}
impl AlterLogicalTablesProcedure {
pub const TYPE_NAME: &'static str = "metasrv-procedure::AlterLogicalTables";
pub fn new(
cluster_id: ClusterId,
tasks: Vec<AlterTableTask>,
physical_table_id: TableId,
context: DdlContext,
) -> Self {
Self {
context,
data: AlterTablesData {
cluster_id,
state: AlterTablesState::Prepare,
tasks,
table_info_values: vec![],
physical_table_id,
physical_table_route: None,
cache_invalidate_keys: vec![],
},
}
}
pub fn from_json(json: &str, context: DdlContext) -> ProcedureResult<Self> {
let data = serde_json::from_str(json).context(FromJsonSnafu)?;
Ok(Self { context, data })
}
pub(crate) async fn on_prepare(&mut self) -> Result<Status> {
// Checks all the tasks
self.check_input_tasks()?;
// Fills the table info values
self.fill_table_info_values().await?;
// Checks the physical table, must after [fill_table_info_values]
self.check_physical_table().await?;
// Fills the physical table info
self.fill_physical_table_route().await?;
// Filter the tasks
let finished_tasks = self.check_finished_tasks()?;
if finished_tasks.iter().all(|x| *x) {
return Ok(Status::done());
}
self.filter_task(&finished_tasks)?;
// Next state
self.data.state = AlterTablesState::SubmitAlterRegionRequests;
Ok(Status::executing(true))
}
pub(crate) async fn on_submit_alter_region_requests(&mut self) -> Result<Status> {
// Safety: we have checked the state in on_prepare
let physical_table_route = &self.data.physical_table_route.as_ref().unwrap();
let leaders = find_leaders(&physical_table_route.region_routes);
let mut alter_region_tasks = Vec::with_capacity(leaders.len());
for peer in leaders {
let requester = self.context.datanode_manager.datanode(&peer).await;
let region_numbers = find_leader_regions(&physical_table_route.region_routes, &peer);
for region_number in region_numbers {
let request = self.make_request(region_number)?;
let peer = peer.clone();
let requester = requester.clone();
alter_region_tasks.push(async move {
requester
.handle(request)
.await
.map_err(add_peer_context_if_needed(peer))
});
}
}
future::join_all(alter_region_tasks)
.await
.into_iter()
.collect::<Result<Vec<_>>>()?;
self.data.state = AlterTablesState::UpdateMetadata;
Ok(Status::executing(true))
}
pub(crate) async fn on_update_metadata(&mut self) -> Result<Status> {
let table_info_values = self.build_update_metadata()?;
let manager = &self.context.table_metadata_manager;
let chunk_size = manager.batch_update_table_info_value_chunk_size();
if table_info_values.len() > chunk_size {
let chunks = table_info_values
.into_iter()
.chunks(chunk_size)
.into_iter()
.map(|check| check.collect::<Vec<_>>())
.collect::<Vec<_>>();
for chunk in chunks {
manager.batch_update_table_info_values(chunk).await?;
}
} else {
manager
.batch_update_table_info_values(table_info_values)
.await?;
}
self.data.state = AlterTablesState::InvalidateTableCache;
Ok(Status::executing(true))
}
pub(crate) async fn on_invalidate_table_cache(&mut self) -> Result<Status> {
let to_invalidate = self
.data
.cache_invalidate_keys
.drain(..)
.map(CacheIdent::TableId)
.collect::<Vec<_>>();
self.context
.cache_invalidator
.invalidate(&cache_invalidator::Context::default(), to_invalidate)
.await?;
Ok(Status::done())
}
}
#[async_trait]
impl Procedure for AlterLogicalTablesProcedure {
fn type_name(&self) -> &str {
Self::TYPE_NAME
}
async fn execute(&mut self, _ctx: &Context) -> ProcedureResult<Status> {
let error_handler = |e: Error| {
if e.is_retry_later() {
common_procedure::Error::retry_later(e)
} else {
common_procedure::Error::external(e)
}
};
let state = &self.data.state;
let step = state.as_ref();
let _timer = metrics::METRIC_META_PROCEDURE_ALTER_TABLE
.with_label_values(&[step])
.start_timer();
match state {
AlterTablesState::Prepare => self.on_prepare().await,
AlterTablesState::SubmitAlterRegionRequests => {
self.on_submit_alter_region_requests().await
}
AlterTablesState::UpdateMetadata => self.on_update_metadata().await,
AlterTablesState::InvalidateTableCache => self.on_invalidate_table_cache().await,
}
.map_err(error_handler)
}
fn dump(&self) -> ProcedureResult<String> {
serde_json::to_string(&self.data).context(ToJsonSnafu)
}
fn lock_key(&self) -> LockKey {
// CatalogLock, SchemaLock,
// TableLock
// TableNameLock(s)
let mut lock_key = Vec::with_capacity(2 + 1 + self.data.tasks.len());
let table_ref = self.data.tasks[0].table_ref();
lock_key.push(CatalogLock::Read(table_ref.catalog).into());
lock_key.push(SchemaLock::read(table_ref.catalog, table_ref.schema).into());
lock_key.push(TableLock::Write(self.data.physical_table_id).into());
for task in &self.data.tasks {
lock_key.push(
TableNameLock::new(
&task.alter_table.catalog_name,
&task.alter_table.schema_name,
&task.alter_table.table_name,
)
.into(),
);
}
LockKey::new(lock_key)
}
}
#[derive(Debug, Serialize, Deserialize)]
pub struct AlterTablesData {
cluster_id: ClusterId,
state: AlterTablesState,
tasks: Vec<AlterTableTask>,
/// Table info values before the alter operation.
/// Corresponding one-to-one with the AlterTableTask in tasks.
table_info_values: Vec<TableInfoValue>,
/// Physical table info
physical_table_id: TableId,
physical_table_route: Option<PhysicalTableRouteValue>,
cache_invalidate_keys: Vec<TableId>,
}
#[derive(Debug, Serialize, Deserialize, AsRefStr)]
enum AlterTablesState {
/// Prepares to alter the table
Prepare,
SubmitAlterRegionRequests,
/// Updates table metadata.
UpdateMetadata,
/// Broadcasts the invalidating table cache instruction.
InvalidateTableCache,
}

View File

@@ -0,0 +1,136 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashSet;
use api::v1::alter_expr::Kind;
use snafu::{ensure, OptionExt};
use crate::ddl::alter_logical_tables::AlterLogicalTablesProcedure;
use crate::error::{AlterLogicalTablesInvalidArgumentsSnafu, Result};
use crate::key::table_info::TableInfoValue;
use crate::key::table_route::TableRouteValue;
use crate::rpc::ddl::AlterTableTask;
impl AlterLogicalTablesProcedure {
pub(crate) fn check_input_tasks(&self) -> Result<()> {
self.check_schema()?;
self.check_alter_kind()?;
Ok(())
}
pub(crate) async fn check_physical_table(&self) -> Result<()> {
let table_route_manager = self.context.table_metadata_manager.table_route_manager();
let table_ids = self
.data
.table_info_values
.iter()
.map(|v| v.table_info.ident.table_id)
.collect::<Vec<_>>();
let table_routes = table_route_manager
.table_route_storage()
.batch_get(&table_ids)
.await?;
let physical_table_id = self.data.physical_table_id;
let is_same_physical_table = table_routes.iter().all(|r| {
if let Some(TableRouteValue::Logical(r)) = r {
r.physical_table_id() == physical_table_id
} else {
false
}
});
ensure!(
is_same_physical_table,
AlterLogicalTablesInvalidArgumentsSnafu {
err_msg: "All the tasks should have the same physical table id"
}
);
Ok(())
}
pub(crate) fn check_finished_tasks(&self) -> Result<Vec<bool>> {
let task = &self.data.tasks;
let table_info_values = &self.data.table_info_values;
Ok(task
.iter()
.zip(table_info_values.iter())
.map(|(task, table)| Self::check_finished_task(task, table))
.collect())
}
// Checks if the schemas of the tasks are the same
fn check_schema(&self) -> Result<()> {
let is_same_schema = self.data.tasks.windows(2).all(|pair| {
pair[0].alter_table.catalog_name == pair[1].alter_table.catalog_name
&& pair[0].alter_table.schema_name == pair[1].alter_table.schema_name
});
ensure!(
is_same_schema,
AlterLogicalTablesInvalidArgumentsSnafu {
err_msg: "Schemas of the tasks are not the same"
}
);
Ok(())
}
fn check_alter_kind(&self) -> Result<()> {
for task in &self.data.tasks {
let kind = task.alter_table.kind.as_ref().context(
AlterLogicalTablesInvalidArgumentsSnafu {
err_msg: "Alter kind is missing",
},
)?;
let Kind::AddColumns(_) = kind else {
return AlterLogicalTablesInvalidArgumentsSnafu {
err_msg: "Only support add columns operation",
}
.fail();
};
}
Ok(())
}
fn check_finished_task(task: &AlterTableTask, table: &TableInfoValue) -> bool {
let columns = table
.table_info
.meta
.schema
.column_schemas
.iter()
.map(|c| &c.name)
.collect::<HashSet<_>>();
let Some(kind) = task.alter_table.kind.as_ref() else {
return true; // Never get here since we have checked it in `check_alter_kind`
};
let Kind::AddColumns(add_columns) = kind else {
return true; // Never get here since we have checked it in `check_alter_kind`
};
// We only check that all columns have been finished. That is to say,
// if one part is finished but another part is not, it will be considered
// unfinished.
add_columns
.add_columns
.iter()
.map(|add_column| add_column.column_def.as_ref().map(|c| &c.name))
.all(|column| column.map(|c| columns.contains(c)).unwrap_or(false))
}
}

View File

@@ -0,0 +1,138 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use common_catalog::format_full_table_name;
use snafu::OptionExt;
use table::metadata::TableId;
use crate::ddl::alter_logical_tables::AlterLogicalTablesProcedure;
use crate::error::{Result, TableInfoNotFoundSnafu, TableNotFoundSnafu};
use crate::key::table_info::TableInfoValue;
use crate::key::table_name::TableNameKey;
use crate::rpc::ddl::AlterTableTask;
impl AlterLogicalTablesProcedure {
pub(crate) fn filter_task(&mut self, finished_tasks: &[bool]) -> Result<()> {
debug_assert_eq!(finished_tasks.len(), self.data.tasks.len());
debug_assert_eq!(finished_tasks.len(), self.data.table_info_values.len());
self.data.tasks = self
.data
.tasks
.drain(..)
.zip(finished_tasks.iter())
.filter_map(|(task, finished)| if *finished { None } else { Some(task) })
.collect();
self.data.table_info_values = self
.data
.table_info_values
.drain(..)
.zip(finished_tasks.iter())
.filter_map(|(table_info_value, finished)| {
if *finished {
None
} else {
Some(table_info_value)
}
})
.collect();
self.data.cache_invalidate_keys = self
.data
.table_info_values
.iter()
.map(|table| table.table_info.ident.table_id)
.collect();
Ok(())
}
pub(crate) async fn fill_physical_table_route(&mut self) -> Result<()> {
let table_route_manager = self.context.table_metadata_manager.table_route_manager();
let (_, physical_table_route) = table_route_manager
.get_physical_table_route(self.data.physical_table_id)
.await?;
self.data.physical_table_route = Some(physical_table_route);
Ok(())
}
pub(crate) async fn fill_table_info_values(&mut self) -> Result<()> {
let table_ids = self.get_all_table_ids().await?;
let table_info_values = self.get_all_table_info_values(&table_ids).await?;
debug_assert_eq!(table_info_values.len(), self.data.tasks.len());
self.data.table_info_values = table_info_values;
Ok(())
}
async fn get_all_table_info_values(
&self,
table_ids: &[TableId],
) -> Result<Vec<TableInfoValue>> {
let table_info_manager = self.context.table_metadata_manager.table_info_manager();
let mut table_info_map = table_info_manager.batch_get(table_ids).await?;
let mut table_info_values = Vec::with_capacity(table_ids.len());
for (table_id, task) in table_ids.iter().zip(self.data.tasks.iter()) {
let table_info_value =
table_info_map
.remove(table_id)
.with_context(|| TableInfoNotFoundSnafu {
table_name: extract_table_name(task),
})?;
table_info_values.push(table_info_value);
}
Ok(table_info_values)
}
async fn get_all_table_ids(&self) -> Result<Vec<TableId>> {
let table_name_manager = self.context.table_metadata_manager.table_name_manager();
let table_name_keys = self
.data
.tasks
.iter()
.map(|task| extract_table_name_key(task))
.collect();
let table_name_values = table_name_manager.batch_get(table_name_keys).await?;
let mut table_ids = Vec::with_capacity(table_name_values.len());
for (value, task) in table_name_values.into_iter().zip(self.data.tasks.iter()) {
let table_id = value
.with_context(|| TableNotFoundSnafu {
table_name: extract_table_name(task),
})?
.table_id();
table_ids.push(table_id);
}
Ok(table_ids)
}
}
#[inline]
fn extract_table_name(task: &AlterTableTask) -> String {
format_full_table_name(
&task.alter_table.catalog_name,
&task.alter_table.schema_name,
&task.alter_table.table_name,
)
}
#[inline]
fn extract_table_name_key(task: &AlterTableTask) -> TableNameKey {
TableNameKey::new(
&task.alter_table.catalog_name,
&task.alter_table.schema_name,
&task.alter_table.table_name,
)
}

View File

@@ -0,0 +1,98 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use api::v1;
use api::v1::alter_expr::Kind;
use api::v1::region::{
alter_request, region_request, AddColumn, AddColumns, AlterRequest, AlterRequests,
RegionColumnDef, RegionRequest, RegionRequestHeader,
};
use common_telemetry::tracing_context::TracingContext;
use store_api::storage::{RegionId, RegionNumber};
use crate::ddl::alter_logical_tables::AlterLogicalTablesProcedure;
use crate::error::Result;
use crate::key::table_info::TableInfoValue;
use crate::rpc::ddl::AlterTableTask;
impl AlterLogicalTablesProcedure {
pub(crate) fn make_request(&self, region_number: RegionNumber) -> Result<RegionRequest> {
let alter_requests = self.make_alter_region_requests(region_number)?;
let request = RegionRequest {
header: Some(RegionRequestHeader {
tracing_context: TracingContext::from_current_span().to_w3c(),
..Default::default()
}),
body: Some(region_request::Body::Alters(alter_requests)),
};
Ok(request)
}
fn make_alter_region_requests(&self, region_number: RegionNumber) -> Result<AlterRequests> {
let mut requests = Vec::with_capacity(self.data.tasks.len());
for (task, table) in self
.data
.tasks
.iter()
.zip(self.data.table_info_values.iter())
{
let region_id = RegionId::new(table.table_info.ident.table_id, region_number);
let request = self.make_alter_region_request(region_id, task, table)?;
requests.push(request);
}
Ok(AlterRequests { requests })
}
fn make_alter_region_request(
&self,
region_id: RegionId,
task: &AlterTableTask,
table: &TableInfoValue,
) -> Result<AlterRequest> {
let region_id = region_id.as_u64();
let schema_version = table.table_info.ident.version;
let kind = match &task.alter_table.kind {
Some(Kind::AddColumns(add_columns)) => Some(alter_request::Kind::AddColumns(
to_region_add_columns(add_columns),
)),
_ => unreachable!(), // Safety: we have checked the kind in check_input_tasks
};
Ok(AlterRequest {
region_id,
schema_version,
kind,
})
}
}
fn to_region_add_columns(add_columns: &v1::AddColumns) -> AddColumns {
let add_columns = add_columns
.add_columns
.iter()
.map(|add_column| {
let region_column_def = RegionColumnDef {
column_def: add_column.column_def.clone(),
..Default::default() // other fields are not used in alter logical table
};
AddColumn {
column_def: Some(region_column_def),
..Default::default() // other fields are not used in alter logical table
}
})
.collect();
AddColumns { add_columns }
}

View File

@@ -0,0 +1,66 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use common_grpc_expr::alter_expr_to_request;
use snafu::ResultExt;
use table::metadata::{RawTableInfo, TableInfo};
use crate::ddl::alter_logical_tables::AlterLogicalTablesProcedure;
use crate::error;
use crate::error::{ConvertAlterTableRequestSnafu, Result};
use crate::key::table_info::TableInfoValue;
use crate::rpc::ddl::AlterTableTask;
impl AlterLogicalTablesProcedure {
pub(crate) fn build_update_metadata(&mut self) -> Result<Vec<(TableInfoValue, RawTableInfo)>> {
let mut table_info_values_to_update = Vec::with_capacity(self.data.tasks.len());
let table_info_values = std::mem::take(&mut self.data.table_info_values);
for (task, table) in self.data.tasks.iter().zip(table_info_values.into_iter()) {
table_info_values_to_update.push(self.build_new_table_info(task, table)?);
}
Ok(table_info_values_to_update)
}
fn build_new_table_info(
&self,
task: &AlterTableTask,
table: TableInfoValue,
) -> Result<(TableInfoValue, RawTableInfo)> {
// Builds new_meta
let table_info = TableInfo::try_from(table.table_info.clone())
.context(error::ConvertRawTableInfoSnafu)?;
let table_ref = task.table_ref();
let request =
alter_expr_to_request(table.table_info.ident.table_id, task.alter_table.clone())
.context(ConvertAlterTableRequestSnafu)?;
let new_meta = table_info
.meta
.builder_with_alter_kind(table_ref.table, &request.alter_kind, true)
.context(error::TableSnafu)?
.build()
.with_context(|_| error::BuildTableMetaSnafu {
table_name: table_ref.table,
})?;
let version = table_info.ident.version + 1;
let mut new_table = table_info;
new_table.meta = new_meta;
new_table.ident.version = version;
let mut raw_table_info = RawTableInfo::from(new_table);
raw_table_info.sort_columns();
Ok((table, raw_table_info))
}
}

View File

@@ -281,7 +281,7 @@ impl AlterTableProcedure {
let new_meta = table_info
.meta
.builder_with_alter_kind(table_ref.table, &request.alter_kind)
.builder_with_alter_kind(table_ref.table, &request.alter_kind, false)
.context(error::TableSnafu)?
.build()
.with_context(|_| error::BuildTableMetaSnafu {
@@ -363,7 +363,8 @@ impl AlterTableProcedure {
)
.into(),
);
lock_key.push(TableLock::Read(*physical_table_id).into())
// We must acquire the write lock since this may update the physical table schema
lock_key.push(TableLock::Write(*physical_table_id).into())
}
let table_ref = self.data.table_ref();

View File

@@ -54,7 +54,7 @@ use crate::{metrics, ClusterId};
pub struct CreateLogicalTablesProcedure {
pub context: DdlContext,
pub creator: TablesCreator,
pub data: CreateTablesData,
}
impl CreateLogicalTablesProcedure {
@@ -66,14 +66,22 @@ impl CreateLogicalTablesProcedure {
physical_table_id: TableId,
context: DdlContext,
) -> Self {
let creator = TablesCreator::new(cluster_id, tasks, physical_table_id);
Self { context, creator }
let len = tasks.len();
let data = CreateTablesData {
cluster_id,
state: CreateTablesState::Prepare,
tasks,
table_ids_already_exists: vec![None; len],
physical_table_id,
physical_region_numbers: vec![],
physical_columns: vec![],
};
Self { context, data }
}
pub fn from_json(json: &str, context: DdlContext) -> ProcedureResult<Self> {
let data = serde_json::from_str(json).context(FromJsonSnafu)?;
let creator = TablesCreator { data };
Ok(Self { context, creator })
Ok(Self { context, data })
}
/// On the prepares step, it performs:
@@ -90,19 +98,17 @@ impl CreateLogicalTablesProcedure {
let manager = &self.context.table_metadata_manager;
// Sets physical region numbers
let physical_table_id = self.creator.data.physical_table_id();
let physical_table_id = self.data.physical_table_id();
let physical_region_numbers = manager
.table_route_manager()
.get_physical_table_route(physical_table_id)
.await
.map(|(_, route)| TableRouteValue::Physical(route).region_numbers())?;
self.creator
.data
self.data
.set_physical_region_numbers(physical_region_numbers);
// Checks if the tables exist
let table_name_keys = self
.creator
.data
.all_create_table_exprs()
.iter()
@@ -117,7 +123,7 @@ impl CreateLogicalTablesProcedure {
.collect::<Vec<_>>();
// Validates the tasks
let tasks = &mut self.creator.data.tasks;
let tasks = &mut self.data.tasks;
for (task, table_id) in tasks.iter().zip(already_exists_tables_ids.iter()) {
if table_id.is_some() {
// If a table already exists, we just ignore it.
@@ -155,19 +161,16 @@ impl CreateLogicalTablesProcedure {
// sort columns in task
task.sort_columns();
common_telemetry::info!("[DEBUG] sorted task {:?}", task);
}
self.creator
.data
self.data
.set_table_ids_already_exists(already_exists_tables_ids);
self.creator.data.state = CreateTablesState::DatanodeCreateRegions;
self.data.state = CreateTablesState::DatanodeCreateRegions;
Ok(Status::executing(true))
}
pub async fn on_datanode_create_regions(&mut self) -> Result<Status> {
let physical_table_id = self.creator.data.physical_table_id();
let physical_table_id = self.data.physical_table_id();
let (_, physical_table_route) = self
.context
.table_metadata_manager
@@ -186,12 +189,12 @@ impl CreateLogicalTablesProcedure {
/// - Failed to create table metadata.
pub async fn on_create_metadata(&mut self) -> Result<Status> {
let manager = &self.context.table_metadata_manager;
let physical_table_id = self.creator.data.physical_table_id();
let remaining_tasks = self.creator.data.remaining_tasks();
let physical_table_id = self.data.physical_table_id();
let remaining_tasks = self.data.remaining_tasks();
let num_tables = remaining_tasks.len();
if num_tables > 0 {
let chunk_size = manager.max_logical_tables_per_batch();
let chunk_size = manager.create_logical_tables_metadata_chunk_size();
if num_tables > chunk_size {
let chunks = remaining_tasks
.into_iter()
@@ -212,28 +215,26 @@ impl CreateLogicalTablesProcedure {
// The `table_id` MUST be collected after the [Prepare::Prepare],
// ensures the all `table_id`s have been allocated.
let table_ids = self
.creator
.data
.tasks
.iter()
.map(|task| task.table_info.ident.table_id)
.collect::<Vec<_>>();
if !self.creator.data.physical_columns.is_empty() {
if !self.data.physical_columns.is_empty() {
// fetch old physical table's info
let physical_table_info = self
.context
.table_metadata_manager
.get_full_table_info(self.creator.data.physical_table_id)
.get_full_table_info(self.data.physical_table_id)
.await?
.0
.context(TableInfoNotFoundSnafu {
table_name: format!("table id - {}", self.creator.data.physical_table_id),
table_name: format!("table id - {}", self.data.physical_table_id),
})?;
// generate new table info
let new_table_info = self
.creator
.data
.build_new_physical_table_info(&physical_table_info);
@@ -248,7 +249,7 @@ impl CreateLogicalTablesProcedure {
.cache_invalidator
.invalidate(
&Context::default(),
vec![CacheIdent::TableId(self.creator.data.physical_table_id)],
vec![CacheIdent::TableId(self.data.physical_table_id)],
)
.await?;
} else {
@@ -275,7 +276,7 @@ impl CreateLogicalTablesProcedure {
datanode: &Peer,
region_routes: &[RegionRoute],
) -> Result<CreateRequests> {
let create_tables_data = &self.creator.data;
let create_tables_data = &self.data;
let tasks = &create_tables_data.tasks;
let physical_table_id = create_tables_data.physical_table_id();
let regions = find_leader_regions(region_routes, datanode);
@@ -332,7 +333,7 @@ impl CreateLogicalTablesProcedure {
.collect::<Result<Vec<_>>>()?;
if raw_schemas.is_empty() {
self.creator.data.state = CreateTablesState::CreateMetadata;
self.data.state = CreateTablesState::CreateMetadata;
return Ok(Status::executing(false));
}
@@ -350,12 +351,12 @@ impl CreateLogicalTablesProcedure {
if let Some(raw_schema) = first {
let physical_columns =
ColumnMetadata::decode_list(raw_schema).context(DecodeJsonSnafu)?;
self.creator.data.physical_columns = physical_columns;
self.data.physical_columns = physical_columns;
} else {
warn!("creating logical table result doesn't contains extension key `{ALTER_PHYSICAL_EXTENSION_KEY}`,leaving the physical table's schema unchanged");
}
self.creator.data.state = CreateTablesState::CreateMetadata;
self.data.state = CreateTablesState::CreateMetadata;
// Ensures the procedures after the crash start from the `DatanodeCreateRegions` stage.
Ok(Status::executing(false))
@@ -369,7 +370,7 @@ impl Procedure for CreateLogicalTablesProcedure {
}
async fn execute(&mut self, _ctx: &ProcedureContext) -> ProcedureResult<Status> {
let state = &self.creator.data.state;
let state = &self.data.state;
let _timer = metrics::METRIC_META_PROCEDURE_CREATE_TABLES
.with_label_values(&[state.as_ref()])
@@ -384,20 +385,20 @@ impl Procedure for CreateLogicalTablesProcedure {
}
fn dump(&self) -> ProcedureResult<String> {
serde_json::to_string(&self.creator.data).context(ToJsonSnafu)
serde_json::to_string(&self.data).context(ToJsonSnafu)
}
fn lock_key(&self) -> LockKey {
// CatalogLock, SchemaLock,
// TableLock
// TableNameLock(s)
let mut lock_key = Vec::with_capacity(2 + 1 + self.creator.data.tasks.len());
let table_ref = self.creator.data.tasks[0].table_ref();
let mut lock_key = Vec::with_capacity(2 + 1 + self.data.tasks.len());
let table_ref = self.data.tasks[0].table_ref();
lock_key.push(CatalogLock::Read(table_ref.catalog).into());
lock_key.push(SchemaLock::read(table_ref.catalog, table_ref.schema).into());
lock_key.push(TableLock::Write(self.creator.data.physical_table_id()).into());
lock_key.push(TableLock::Write(self.data.physical_table_id()).into());
for task in &self.creator.data.tasks {
for task in &self.data.tasks {
lock_key.push(
TableNameLock::new(
&task.create_table.catalog_name,
@@ -411,33 +412,6 @@ impl Procedure for CreateLogicalTablesProcedure {
}
}
pub struct TablesCreator {
/// The serializable data.
pub data: CreateTablesData,
}
impl TablesCreator {
pub fn new(
cluster_id: ClusterId,
tasks: Vec<CreateTableTask>,
physical_table_id: TableId,
) -> Self {
let len = tasks.len();
Self {
data: CreateTablesData {
cluster_id,
state: CreateTablesState::Prepare,
tasks,
table_ids_already_exists: vec![None; len],
physical_table_id,
physical_region_numbers: vec![],
physical_columns: vec![],
},
}
}
}
#[derive(Debug, Serialize, Deserialize)]
pub struct CreateTablesData {
cluster_id: ClusterId,

View File

@@ -198,8 +198,10 @@ mod tests {
use table::metadata::RawTableInfo;
use super::*;
use crate::ddl::test_util::create_table::build_raw_table_info_from_expr;
use crate::ddl::test_util::{TestColumnDefBuilder, TestCreateTableExprBuilder};
use crate::ddl::test_util::columns::TestColumnDefBuilder;
use crate::ddl::test_util::create_table::{
build_raw_table_info_from_expr, TestCreateTableExprBuilder,
};
use crate::table_name::TableName;
use crate::test_util::{new_ddl_context, MockDatanodeManager};

View File

@@ -12,8 +12,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.
pub mod alter_table;
pub mod columns;
pub mod create_table;
pub use create_table::{
TestColumnDef, TestColumnDefBuilder, TestCreateTableExpr, TestCreateTableExprBuilder,
};

View File

@@ -0,0 +1,62 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use api::v1::alter_expr::Kind;
use api::v1::{AddColumn, AddColumns, AlterExpr, ColumnDef, RenameTable};
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use derive_builder::Builder;
#[derive(Default, Builder)]
#[builder(default)]
pub struct TestAlterTableExpr {
#[builder(setter(into), default = "DEFAULT_CATALOG_NAME.to_string()")]
catalog_name: String,
#[builder(setter(into), default = "DEFAULT_SCHEMA_NAME.to_string()")]
schema_name: String,
#[builder(setter(into))]
table_name: String,
#[builder(setter(into))]
add_columns: Vec<ColumnDef>,
#[builder(setter(into))]
new_table_name: Option<String>,
}
impl From<TestAlterTableExpr> for AlterExpr {
fn from(value: TestAlterTableExpr) -> Self {
if let Some(new_table_name) = value.new_table_name {
Self {
catalog_name: value.catalog_name,
schema_name: value.schema_name,
table_name: value.table_name,
kind: Some(Kind::RenameTable(RenameTable { new_table_name })),
}
} else {
Self {
catalog_name: value.catalog_name,
schema_name: value.schema_name,
table_name: value.table_name,
kind: Some(Kind::AddColumns(AddColumns {
add_columns: value
.add_columns
.into_iter()
.map(|col| AddColumn {
column_def: Some(col),
location: None,
})
.collect(),
})),
}
}
}
}

View File

@@ -0,0 +1,50 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use api::v1::{ColumnDataType, ColumnDef, SemanticType};
use derive_builder::Builder;
#[derive(Default, Builder)]
pub struct TestColumnDef {
#[builder(setter(into), default)]
name: String,
data_type: ColumnDataType,
#[builder(default)]
is_nullable: bool,
semantic_type: SemanticType,
#[builder(setter(into), default)]
comment: String,
}
impl From<TestColumnDef> for ColumnDef {
fn from(
TestColumnDef {
name,
data_type,
is_nullable,
semantic_type,
comment,
}: TestColumnDef,
) -> Self {
Self {
name,
data_type: data_type as i32,
is_nullable,
default_constraint: vec![],
semantic_type: semantic_type as i32,
comment,
datatype_extension: None,
}
}
}

View File

@@ -15,7 +15,7 @@
use std::collections::HashMap;
use api::v1::column_def::try_as_column_schema;
use api::v1::{ColumnDataType, ColumnDef, CreateTableExpr, SemanticType};
use api::v1::{ColumnDef, CreateTableExpr, SemanticType};
use chrono::DateTime;
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, MITO2_ENGINE};
use datatypes::schema::RawSchema;
@@ -24,40 +24,6 @@ use store_api::storage::TableId;
use table::metadata::{RawTableInfo, RawTableMeta, TableIdent, TableType};
use table::requests::TableOptions;
#[derive(Default, Builder)]
pub struct TestColumnDef {
#[builder(setter(into), default)]
name: String,
data_type: ColumnDataType,
#[builder(default)]
is_nullable: bool,
semantic_type: SemanticType,
#[builder(setter(into), default)]
comment: String,
}
impl From<TestColumnDef> for ColumnDef {
fn from(
TestColumnDef {
name,
data_type,
is_nullable,
semantic_type,
comment,
}: TestColumnDef,
) -> Self {
Self {
name,
data_type: data_type as i32,
is_nullable,
default_constraint: vec![],
semantic_type: semantic_type as i32,
comment,
datatype_extension: None,
}
}
}
#[derive(Default, Builder)]
#[builder(default)]
pub struct TestCreateTableExpr {

View File

@@ -12,5 +12,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.
mod alter_logical_tables;
mod create_logical_tables;
mod create_table;

View File

@@ -0,0 +1,412 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::assert_matches::assert_matches;
use std::sync::Arc;
use api::v1::{ColumnDataType, SemanticType};
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_procedure::{Procedure, ProcedureId, Status};
use common_procedure_test::MockContextProvider;
use table::metadata::TableId;
use crate::ddl::alter_logical_tables::AlterLogicalTablesProcedure;
use crate::ddl::create_logical_tables::CreateLogicalTablesProcedure;
use crate::ddl::test_util::alter_table::TestAlterTableExprBuilder;
use crate::ddl::test_util::columns::TestColumnDefBuilder;
use crate::ddl::tests::create_logical_tables;
use crate::ddl::tests::create_logical_tables::{
test_create_physical_table_task, NaiveDatanodeHandler,
};
use crate::ddl::{DdlContext, TableMetadata, TableMetadataAllocatorContext};
use crate::error::Error::{AlterLogicalTablesInvalidArguments, TableNotFound};
use crate::key::table_name::TableNameKey;
use crate::rpc::ddl::AlterTableTask;
use crate::test_util::{new_ddl_context, MockDatanodeManager};
use crate::ClusterId;
fn make_alter_logical_table_add_column_task(
schema: Option<&str>,
table: &str,
add_columns: Vec<String>,
) -> AlterTableTask {
let add_columns = add_columns
.into_iter()
.map(|name| {
TestColumnDefBuilder::default()
.name(name)
.data_type(ColumnDataType::String)
.is_nullable(true)
.semantic_type(SemanticType::Tag)
.comment("new column".to_string())
.build()
.unwrap()
.into()
})
.collect::<Vec<_>>();
let mut alter_table = TestAlterTableExprBuilder::default();
if let Some(schema) = schema {
alter_table.schema_name(schema.to_string());
}
let alter_table = alter_table
.table_name(table.to_string())
.add_columns(add_columns)
.build()
.unwrap();
AlterTableTask {
alter_table: alter_table.into(),
}
}
fn make_alter_logical_table_rename_task(
schema: &str,
table: &str,
new_table_name: &str,
) -> AlterTableTask {
let alter_table = TestAlterTableExprBuilder::default()
.schema_name(schema.to_string())
.table_name(table.to_string())
.new_table_name(new_table_name.to_string())
.build()
.unwrap();
AlterTableTask {
alter_table: alter_table.into(),
}
}
#[tokio::test]
async fn test_on_prepare_check_schema() {
let datanode_manager = Arc::new(MockDatanodeManager::new(()));
let ddl_context = new_ddl_context(datanode_manager);
let cluster_id = 1;
let tasks = vec![
make_alter_logical_table_add_column_task(
Some("schema1"),
"table1",
vec!["column1".to_string()],
),
make_alter_logical_table_add_column_task(
Some("schema2"),
"table2",
vec!["column2".to_string()],
),
];
let physical_table_id = 1024u32;
let mut procedure =
AlterLogicalTablesProcedure::new(cluster_id, tasks, physical_table_id, ddl_context);
let err = procedure.on_prepare().await.unwrap_err();
assert_matches!(err, AlterLogicalTablesInvalidArguments { .. });
}
#[tokio::test]
async fn test_on_prepare_check_alter_kind() {
let datanode_manager = Arc::new(MockDatanodeManager::new(()));
let ddl_context = new_ddl_context(datanode_manager);
let cluster_id = 1;
let tasks = vec![make_alter_logical_table_rename_task(
"schema1",
"table1",
"new_table1",
)];
let physical_table_id = 1024u32;
let mut procedure =
AlterLogicalTablesProcedure::new(cluster_id, tasks, physical_table_id, ddl_context);
let err = procedure.on_prepare().await.unwrap_err();
assert_matches!(err, AlterLogicalTablesInvalidArguments { .. });
}
async fn create_physical_table(
ddl_context: DdlContext,
cluster_id: ClusterId,
name: &str,
) -> TableId {
// Prepares physical table metadata.
let mut create_physical_table_task = test_create_physical_table_task(name);
let TableMetadata {
table_id,
table_route,
..
} = ddl_context
.table_metadata_allocator
.create(
&TableMetadataAllocatorContext { cluster_id },
&create_physical_table_task,
)
.await
.unwrap();
create_physical_table_task.set_table_id(table_id);
create_logical_tables::create_physical_table_metadata(
&ddl_context,
create_physical_table_task.table_info.clone(),
table_route,
)
.await;
table_id
}
async fn create_logical_table(
ddl_context: DdlContext,
cluster_id: ClusterId,
physical_table_id: TableId,
table_name: &str,
) {
let tasks = vec![create_logical_tables::test_create_logical_table_task(
table_name,
)];
let mut procedure =
CreateLogicalTablesProcedure::new(cluster_id, tasks, physical_table_id, ddl_context);
let status = procedure.on_prepare().await.unwrap();
assert_matches!(status, Status::Executing { persist: true });
let status = procedure.on_create_metadata().await.unwrap();
assert_matches!(status, Status::Done { .. });
}
#[tokio::test]
async fn test_on_prepare_different_physical_table() {
let cluster_id = 1;
let datanode_manager = Arc::new(MockDatanodeManager::new(()));
let ddl_context = new_ddl_context(datanode_manager);
let phy1_id = create_physical_table(ddl_context.clone(), cluster_id, "phy1").await;
create_logical_table(ddl_context.clone(), cluster_id, phy1_id, "table1").await;
let phy2_id = create_physical_table(ddl_context.clone(), cluster_id, "phy2").await;
create_logical_table(ddl_context.clone(), cluster_id, phy2_id, "table2").await;
let tasks = vec![
make_alter_logical_table_add_column_task(None, "table1", vec!["column1".to_string()]),
make_alter_logical_table_add_column_task(None, "table2", vec!["column2".to_string()]),
];
let mut procedure = AlterLogicalTablesProcedure::new(cluster_id, tasks, phy1_id, ddl_context);
let err = procedure.on_prepare().await.unwrap_err();
assert_matches!(err, AlterLogicalTablesInvalidArguments { .. });
}
#[tokio::test]
async fn test_on_prepare_logical_table_not_exists() {
let cluster_id = 1;
let datanode_manager = Arc::new(MockDatanodeManager::new(()));
let ddl_context = new_ddl_context(datanode_manager);
// Creates physical table
let phy_id = create_physical_table(ddl_context.clone(), cluster_id, "phy").await;
// Creates 3 logical tables
create_logical_table(ddl_context.clone(), cluster_id, phy_id, "table1").await;
let tasks = vec![
make_alter_logical_table_add_column_task(None, "table1", vec!["column1".to_string()]),
// table2 not exists
make_alter_logical_table_add_column_task(None, "table2", vec!["column2".to_string()]),
];
let mut procedure = AlterLogicalTablesProcedure::new(cluster_id, tasks, phy_id, ddl_context);
let err = procedure.on_prepare().await.unwrap_err();
assert_matches!(err, TableNotFound { .. });
}
#[tokio::test]
async fn test_on_prepare() {
let cluster_id = 1;
let datanode_manager = Arc::new(MockDatanodeManager::new(()));
let ddl_context = new_ddl_context(datanode_manager);
// Creates physical table
let phy_id = create_physical_table(ddl_context.clone(), cluster_id, "phy").await;
// Creates 3 logical tables
create_logical_table(ddl_context.clone(), cluster_id, phy_id, "table1").await;
create_logical_table(ddl_context.clone(), cluster_id, phy_id, "table2").await;
create_logical_table(ddl_context.clone(), cluster_id, phy_id, "table3").await;
let tasks = vec![
make_alter_logical_table_add_column_task(None, "table1", vec!["column1".to_string()]),
make_alter_logical_table_add_column_task(None, "table2", vec!["column2".to_string()]),
make_alter_logical_table_add_column_task(None, "table3", vec!["column3".to_string()]),
];
let mut procedure = AlterLogicalTablesProcedure::new(cluster_id, tasks, phy_id, ddl_context);
let result = procedure.on_prepare().await;
assert_matches!(result, Ok(Status::Executing { persist: true }));
}
#[tokio::test]
async fn test_on_update_metadata() {
let cluster_id = 1;
let datanode_manager = Arc::new(MockDatanodeManager::new(NaiveDatanodeHandler));
let ddl_context = new_ddl_context(datanode_manager);
// Creates physical table
let phy_id = create_physical_table(ddl_context.clone(), cluster_id, "phy").await;
// Creates 3 logical tables
create_logical_table(ddl_context.clone(), cluster_id, phy_id, "table1").await;
create_logical_table(ddl_context.clone(), cluster_id, phy_id, "table2").await;
create_logical_table(ddl_context.clone(), cluster_id, phy_id, "table3").await;
create_logical_table(ddl_context.clone(), cluster_id, phy_id, "table4").await;
create_logical_table(ddl_context.clone(), cluster_id, phy_id, "table5").await;
let tasks = vec![
make_alter_logical_table_add_column_task(None, "table1", vec!["new_col".to_string()]),
make_alter_logical_table_add_column_task(None, "table2", vec!["mew_col".to_string()]),
make_alter_logical_table_add_column_task(None, "table3", vec!["new_col".to_string()]),
];
let mut procedure = AlterLogicalTablesProcedure::new(cluster_id, tasks, phy_id, ddl_context);
let mut status = procedure.on_prepare().await.unwrap();
assert_matches!(status, Status::Executing { persist: true });
let ctx = common_procedure::Context {
procedure_id: ProcedureId::random(),
provider: Arc::new(MockContextProvider::default()),
};
// on_submit_alter_region_requests
status = procedure.execute(&ctx).await.unwrap();
assert_matches!(status, Status::Executing { persist: true });
// on_update_metadata
status = procedure.execute(&ctx).await.unwrap();
assert_matches!(status, Status::Executing { persist: true });
}
#[tokio::test]
async fn test_on_part_duplicate_alter_request() {
let cluster_id = 1;
let datanode_manager = Arc::new(MockDatanodeManager::new(NaiveDatanodeHandler));
let ddl_context = new_ddl_context(datanode_manager);
// Creates physical table
let phy_id = create_physical_table(ddl_context.clone(), cluster_id, "phy").await;
// Creates 3 logical tables
create_logical_table(ddl_context.clone(), cluster_id, phy_id, "table1").await;
create_logical_table(ddl_context.clone(), cluster_id, phy_id, "table2").await;
let tasks = vec![
make_alter_logical_table_add_column_task(None, "table1", vec!["col_0".to_string()]),
make_alter_logical_table_add_column_task(None, "table2", vec!["col_0".to_string()]),
];
let mut procedure =
AlterLogicalTablesProcedure::new(cluster_id, tasks, phy_id, ddl_context.clone());
let mut status = procedure.on_prepare().await.unwrap();
assert_matches!(status, Status::Executing { persist: true });
let ctx = common_procedure::Context {
procedure_id: ProcedureId::random(),
provider: Arc::new(MockContextProvider::default()),
};
// on_submit_alter_region_requests
status = procedure.execute(&ctx).await.unwrap();
assert_matches!(status, Status::Executing { persist: true });
// on_update_metadata
status = procedure.execute(&ctx).await.unwrap();
assert_matches!(status, Status::Executing { persist: true });
// re-alter
let tasks = vec![
make_alter_logical_table_add_column_task(
None,
"table1",
vec!["col_0".to_string(), "new_col_1".to_string()],
),
make_alter_logical_table_add_column_task(
None,
"table2",
vec![
"col_0".to_string(),
"new_col_2".to_string(),
"new_col_1".to_string(),
],
),
];
let mut procedure =
AlterLogicalTablesProcedure::new(cluster_id, tasks, phy_id, ddl_context.clone());
let mut status = procedure.on_prepare().await.unwrap();
assert_matches!(status, Status::Executing { persist: true });
let ctx = common_procedure::Context {
procedure_id: ProcedureId::random(),
provider: Arc::new(MockContextProvider::default()),
};
// on_submit_alter_region_requests
status = procedure.execute(&ctx).await.unwrap();
assert_matches!(status, Status::Executing { persist: true });
// on_update_metadata
status = procedure.execute(&ctx).await.unwrap();
assert_matches!(status, Status::Executing { persist: true });
let table_name_keys = vec![
TableNameKey::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, "table1"),
TableNameKey::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, "table2"),
];
let table_ids = ddl_context
.table_metadata_manager
.table_name_manager()
.batch_get(table_name_keys)
.await
.unwrap()
.into_iter()
.map(|x| x.unwrap().table_id())
.collect::<Vec<_>>();
let tables = ddl_context
.table_metadata_manager
.table_info_manager()
.batch_get(&table_ids)
.await
.unwrap();
let table1 = tables.get(&table_ids[0]).unwrap();
let table2 = tables.get(&table_ids[1]).unwrap();
assert_eq!(table1.table_info.name, "table1");
assert_eq!(table2.table_info.name, "table2");
let table1_cols = table1
.table_info
.meta
.schema
.column_schemas
.iter()
.map(|x| x.name.clone())
.collect::<Vec<_>>();
assert_eq!(
table1_cols,
vec![
"col_0".to_string(),
"cpu".to_string(),
"host".to_string(),
"new_col_1".to_string(),
"ts".to_string()
]
);
let table2_cols = table2
.table_info
.meta
.schema
.column_schemas
.iter()
.map(|x| x.name.clone())
.collect::<Vec<_>>();
assert_eq!(
table2_cols,
vec![
"col_0".to_string(),
"cpu".to_string(),
"host".to_string(),
"new_col_1".to_string(),
"new_col_2".to_string(),
"ts".to_string()
]
);
}

View File

@@ -30,8 +30,10 @@ use table::metadata::RawTableInfo;
use crate::datanode_manager::HandleResponse;
use crate::ddl::create_logical_tables::CreateLogicalTablesProcedure;
use crate::ddl::test_util::create_table::build_raw_table_info_from_expr;
use crate::ddl::test_util::{TestColumnDefBuilder, TestCreateTableExprBuilder};
use crate::ddl::test_util::columns::TestColumnDefBuilder;
use crate::ddl::test_util::create_table::{
build_raw_table_info_from_expr, TestCreateTableExprBuilder,
};
use crate::ddl::{DdlContext, TableMetadata, TableMetadataAllocatorContext};
use crate::error::{Error, Result};
use crate::key::table_route::TableRouteValue;
@@ -41,7 +43,7 @@ use crate::test_util::{new_ddl_context, MockDatanodeHandler, MockDatanodeManager
// Note: this code may be duplicated with others.
// However, it's by design, ensures the tests are easy to be modified or added.
fn test_create_logical_table_task(name: &str) -> CreateTableTask {
pub(crate) fn test_create_logical_table_task(name: &str) -> CreateTableTask {
let create_table = TestCreateTableExprBuilder::default()
.column_defs([
TestColumnDefBuilder::default()
@@ -86,7 +88,7 @@ fn test_create_logical_table_task(name: &str) -> CreateTableTask {
// Note: this code may be duplicated with others.
// However, it's by design, ensures the tests are easy to be modified or added.
fn test_create_physical_table_task(name: &str) -> CreateTableTask {
pub(crate) fn test_create_physical_table_task(name: &str) -> CreateTableTask {
let create_table = TestCreateTableExprBuilder::default()
.column_defs([
TestColumnDefBuilder::default()
@@ -135,7 +137,7 @@ async fn test_on_prepare_physical_table_not_found() {
assert_matches!(err, Error::TableRouteNotFound { .. });
}
async fn create_physical_table_metadata(
pub(crate) async fn create_physical_table_metadata(
ddl_context: &DdlContext,
table_info: RawTableInfo,
table_route: TableRouteValue,

View File

@@ -28,8 +28,10 @@ use common_telemetry::debug;
use crate::datanode_manager::HandleResponse;
use crate::ddl::create_table::CreateTableProcedure;
use crate::ddl::test_util::create_table::build_raw_table_info_from_expr;
use crate::ddl::test_util::{TestColumnDefBuilder, TestCreateTableExprBuilder};
use crate::ddl::test_util::columns::TestColumnDefBuilder;
use crate::ddl::test_util::create_table::{
build_raw_table_info_from_expr, TestCreateTableExprBuilder,
};
use crate::error;
use crate::error::{Error, Result};
use crate::key::table_route::TableRouteValue;

View File

@@ -19,9 +19,7 @@ use snafu::{ensure, location, Location, OptionExt};
use store_api::metric_engine_consts::LOGICAL_TABLE_METADATA_KEY;
use table::metadata::TableId;
use crate::error::{
EmptyCreateTableTasksSnafu, Error, Result, TableNotFoundSnafu, UnsupportedSnafu,
};
use crate::error::{Error, Result, TableNotFoundSnafu, UnsupportedSnafu};
use crate::key::table_name::TableNameKey;
use crate::key::TableMetadataManagerRef;
use crate::peer::Peer;
@@ -98,7 +96,8 @@ pub async fn check_and_get_physical_table_id(
None => Some(current_physical_table_name),
};
}
let physical_table_name = physical_table_name.context(EmptyCreateTableTasksSnafu)?;
// Safety: `physical_table_name` is `Some` here
let physical_table_name = physical_table_name.unwrap();
table_metadata_manager
.table_name_manager()
.get(physical_table_name)
@@ -108,3 +107,22 @@ pub async fn check_and_get_physical_table_id(
})
.map(|table| table.table_id())
}
pub async fn get_physical_table_id(
table_metadata_manager: &TableMetadataManagerRef,
logical_table_name: TableNameKey<'_>,
) -> Result<TableId> {
let logical_table_id = table_metadata_manager
.table_name_manager()
.get(logical_table_name)
.await?
.context(TableNotFoundSnafu {
table_name: logical_table_name.to_string(),
})
.map(|table| table.table_id())?;
table_metadata_manager
.table_route_manager()
.get_physical_table_id(logical_table_id)
.await
}

View File

@@ -24,6 +24,7 @@ use store_api::storage::TableId;
use crate::cache_invalidator::CacheInvalidatorRef;
use crate::datanode_manager::DatanodeManagerRef;
use crate::ddl::alter_logical_tables::AlterLogicalTablesProcedure;
use crate::ddl::alter_table::AlterTableProcedure;
use crate::ddl::create_logical_tables::CreateLogicalTablesProcedure;
use crate::ddl::create_table::CreateTableProcedure;
@@ -33,7 +34,7 @@ use crate::ddl::table_meta::TableMetadataAllocatorRef;
use crate::ddl::truncate_table::TruncateTableProcedure;
use crate::ddl::{utils, DdlContext, ExecutorContext, ProcedureExecutor};
use crate::error::{
self, EmptyCreateTableTasksSnafu, ProcedureOutputSnafu, RegisterProcedureLoaderSnafu, Result,
self, EmptyDdlTasksSnafu, ProcedureOutputSnafu, RegisterProcedureLoaderSnafu, Result,
SubmitProcedureSnafu, TableNotFoundSnafu, UnsupportedSnafu, WaitProcedureSnafu,
};
use crate::key::table_info::TableInfoValue;
@@ -137,6 +138,16 @@ impl DdlManager {
})
},
),
(
AlterLogicalTablesProcedure::TYPE_NAME,
&|context: DdlContext| -> BoxedProcedureLoader {
Box::new(move |json: &str| {
let context = context.clone();
AlterLogicalTablesProcedure::from_json(json, context)
.map(|p| Box::new(p) as _)
})
},
),
(
DropTableProcedure::TYPE_NAME,
&|context: DdlContext| -> BoxedProcedureLoader {
@@ -217,7 +228,7 @@ impl DdlManager {
}
#[tracing::instrument(skip_all)]
/// Submits and executes a create table task.
/// Submits and executes a create multiple logical table tasks.
pub async fn submit_create_logical_table_tasks(
&self,
cluster_id: ClusterId,
@@ -238,6 +249,28 @@ impl DdlManager {
self.submit_procedure(procedure_with_id).await
}
#[tracing::instrument(skip_all)]
/// Submits and executes alter multiple table tasks.
pub async fn submit_alter_logical_table_tasks(
&self,
cluster_id: ClusterId,
alter_table_tasks: Vec<AlterTableTask>,
physical_table_id: TableId,
) -> Result<(ProcedureId, Option<Output>)> {
let context = self.create_context();
let procedure = AlterLogicalTablesProcedure::new(
cluster_id,
alter_table_tasks,
physical_table_id,
context,
);
let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
self.submit_procedure(procedure_with_id).await
}
#[tracing::instrument(skip_all)]
/// Submits and executes a drop table task.
pub async fn submit_drop_table_task(
@@ -510,7 +543,12 @@ async fn handle_create_logical_table_tasks(
cluster_id: ClusterId,
mut create_table_tasks: Vec<CreateTableTask>,
) -> Result<SubmitDdlTaskResponse> {
ensure!(!create_table_tasks.is_empty(), EmptyCreateTableTasksSnafu);
ensure!(
!create_table_tasks.is_empty(),
EmptyDdlTasksSnafu {
name: "create logical tables"
}
);
let physical_table_id = utils::check_and_get_physical_table_id(
&ddl_manager.table_metadata_manager,
&create_table_tasks,
@@ -566,7 +604,42 @@ async fn handle_drop_database_task(
Ok(SubmitDdlTaskResponse {
key: procedure_id.into(),
table_id: None,
..Default::default()
})
}
async fn handle_alter_logical_table_tasks(
ddl_manager: &DdlManager,
cluster_id: ClusterId,
alter_table_tasks: Vec<AlterTableTask>,
) -> Result<SubmitDdlTaskResponse> {
ensure!(
!alter_table_tasks.is_empty(),
EmptyDdlTasksSnafu {
name: "alter logical tables"
}
);
// Use the physical table id in the first logical table, then it will be checked in the procedure.
let first_table = TableNameKey {
catalog: &alter_table_tasks[0].alter_table.catalog_name,
schema: &alter_table_tasks[0].alter_table.schema_name,
table: &alter_table_tasks[0].alter_table.table_name,
};
let physical_table_id =
utils::get_physical_table_id(&ddl_manager.table_metadata_manager, first_table).await?;
let num_logical_tables = alter_table_tasks.len();
let (id, _) = ddl_manager
.submit_alter_logical_table_tasks(cluster_id, alter_table_tasks, physical_table_id)
.await?;
info!("{num_logical_tables} logical tables on physical table: {physical_table_id:?} is altered via procedure_id {id:?}");
let procedure_id = id.to_string();
Ok(SubmitDdlTaskResponse {
key: procedure_id.into(),
..Default::default()
})
}
@@ -604,8 +677,10 @@ impl ProcedureExecutor for DdlManager {
CreateLogicalTables(create_table_tasks) => {
handle_create_logical_table_tasks(self, cluster_id, create_table_tasks).await
}
AlterLogicalTables(alter_table_tasks) => {
handle_alter_logical_table_tasks(self, cluster_id, alter_table_tasks).await
}
DropLogicalTables(_) => todo!(),
AlterLogicalTables(_) => todo!(),
DropDatabase(drop_database_task) => {
handle_drop_database_task(self, cluster_id, drop_database_task).await
}

View File

@@ -398,11 +398,14 @@ pub enum Error {
#[snafu(display("Unexpected table route type: {}", err_msg))]
UnexpectedLogicalRouteTable { location: Location, err_msg: String },
#[snafu(display("The tasks of create tables cannot be empty"))]
EmptyCreateTableTasks { location: Location },
#[snafu(display("The tasks of {} cannot be empty", name))]
EmptyDdlTasks { name: String, location: Location },
#[snafu(display("Metadata corruption: {}", err_msg))]
MetadataCorruption { err_msg: String, location: Location },
#[snafu(display("Alter logical tables invalid arguments: {}", err_msg))]
AlterLogicalTablesInvalidArguments { err_msg: String, location: Location },
}
pub type Result<T> = std::result::Result<T, Error>;
@@ -462,7 +465,8 @@ impl ErrorExt for Error {
ProcedureNotFound { .. }
| PrimaryKeyNotFound { .. }
| EmptyKey { .. }
| InvalidEngineType { .. } => StatusCode::InvalidArguments,
| InvalidEngineType { .. }
| AlterLogicalTablesInvalidArguments { .. } => StatusCode::InvalidArguments,
TableNotFound { .. } => StatusCode::TableNotFound,
TableAlreadyExists { .. } => StatusCode::TableAlreadyExists,
@@ -480,8 +484,8 @@ impl ErrorExt for Error {
ParseProcedureId { .. }
| InvalidNumTopics { .. }
| EmptyCreateTableTasks { .. }
| SchemaNotFound { .. } => StatusCode::InvalidArguments,
| SchemaNotFound { .. }
| EmptyDdlTasks { .. } => StatusCode::InvalidArguments,
}
}

View File

@@ -461,7 +461,7 @@ impl TableMetadataManager {
Ok(())
}
pub fn max_logical_tables_per_batch(&self) -> usize {
pub fn create_logical_tables_metadata_chunk_size(&self) -> usize {
// The batch size is max_txn_size / 3 because the size of the `tables_data`
// is 3 times the size of the `tables_data`.
self.kv_backend.max_txn_ops() / 3
@@ -686,6 +686,64 @@ impl TableMetadataManager {
Ok(())
}
pub fn batch_update_table_info_value_chunk_size(&self) -> usize {
self.kv_backend.max_txn_ops()
}
pub async fn batch_update_table_info_values(
&self,
table_info_value_pairs: Vec<(TableInfoValue, RawTableInfo)>,
) -> Result<()> {
let len = table_info_value_pairs.len();
let mut txns = Vec::with_capacity(len);
struct OnFailure<F, R>
where
F: FnOnce(&Vec<TxnOpResponse>) -> R,
{
table_info_value: TableInfoValue,
on_update_table_info_failure: F,
}
let mut on_failures = Vec::with_capacity(len);
for (table_info_value, new_table_info) in table_info_value_pairs {
let table_id = table_info_value.table_info.ident.table_id;
let new_table_info_value = table_info_value.update(new_table_info);
let (update_table_info_txn, on_update_table_info_failure) =
self.table_info_manager().build_update_txn(
table_id,
&DeserializedValueWithBytes::from_inner(table_info_value),
&new_table_info_value,
)?;
txns.push(update_table_info_txn);
on_failures.push(OnFailure {
table_info_value: new_table_info_value,
on_update_table_info_failure,
});
}
let txn = Txn::merge_all(txns);
let r = self.kv_backend.txn(txn).await?;
if !r.succeeded {
for on_failure in on_failures {
let remote_table_info = (on_failure.on_update_table_info_failure)(&r.responses)?
.context(error::UnexpectedSnafu {
err_msg: "Reads the empty table info during the updating table info",
})?
.into_inner();
let op_name = "the batch updating table info";
ensure_values!(remote_table_info, on_failure.table_info_value, op_name);
}
}
Ok(())
}
pub async fn update_table_route(
&self,
table_id: TableId,

View File

@@ -23,9 +23,7 @@ use api::v1::meta::{
DropTableTasks as PbDropTableTasks, Partition, ProcedureId,
TruncateTableTask as PbTruncateTableTask,
};
use api::v1::{
AlterExpr, CreateTableExpr, DropDatabaseExpr, DropTableExpr, SemanticType, TruncateTableExpr,
};
use api::v1::{AlterExpr, CreateTableExpr, DropDatabaseExpr, DropTableExpr, TruncateTableExpr};
use base64::engine::general_purpose;
use base64::Engine as _;
use prost::Message;
@@ -67,6 +65,15 @@ impl DdlTask {
)
}
pub fn new_alter_logical_tables(table_data: Vec<AlterExpr>) -> Self {
DdlTask::AlterLogicalTables(
table_data
.into_iter()
.map(|alter_table| AlterTableTask { alter_table })
.collect(),
)
}
pub fn new_drop_table(
catalog: String,
schema: String,
@@ -396,31 +403,7 @@ impl CreateTableTask {
.column_defs
.sort_unstable_by(|a, b| a.name.cmp(&b.name));
// compute new indices of sorted columns
// this part won't do any check or verification.
let mut primary_key_indices = Vec::with_capacity(self.create_table.primary_keys.len());
let mut value_indices =
Vec::with_capacity(self.create_table.column_defs.len() - primary_key_indices.len() - 1);
let mut timestamp_index = None;
for (index, col) in self.create_table.column_defs.iter().enumerate() {
if self.create_table.primary_keys.contains(&col.name) {
primary_key_indices.push(index);
} else if col.semantic_type == SemanticType::Timestamp as i32 {
timestamp_index = Some(index);
} else {
value_indices.push(index);
}
}
// overwrite table info
self.table_info
.meta
.schema
.column_schemas
.sort_unstable_by(|a, b| a.name.cmp(&b.name));
self.table_info.meta.schema.timestamp_index = timestamp_index;
self.table_info.meta.primary_key_indices = primary_key_indices;
self.table_info.meta.value_indices = value_indices;
self.table_info.sort_columns();
}
}
@@ -707,7 +690,8 @@ mod tests {
"column1".to_string(),
ConcreteDataType::timestamp_millisecond_datatype(),
false,
),
)
.with_time_index(true),
ColumnSchema::new(
"column2".to_string(),
ConcreteDataType::float64_datatype(),

View File

@@ -32,8 +32,10 @@ use common_meta::ddl::create_logical_tables::{CreateLogicalTablesProcedure, Crea
use common_meta::ddl::create_table::*;
use common_meta::ddl::drop_table::executor::DropTableExecutor;
use common_meta::ddl::drop_table::DropTableProcedure;
use common_meta::ddl::test_util::create_table::build_raw_table_info_from_expr;
use common_meta::ddl::test_util::{TestColumnDefBuilder, TestCreateTableExprBuilder};
use common_meta::ddl::test_util::columns::TestColumnDefBuilder;
use common_meta::ddl::test_util::create_table::{
build_raw_table_info_from_expr, TestCreateTableExprBuilder,
};
use common_meta::key::table_info::TableInfoValue;
use common_meta::key::table_route::TableRouteValue;
use common_meta::key::DeserializedValueWithBytes;
@@ -303,7 +305,7 @@ async fn test_on_datanode_create_logical_regions() {
let status = procedure.on_datanode_create_regions().await.unwrap();
assert!(matches!(status, Status::Executing { persist: false }));
assert!(matches!(
procedure.creator.data.state(),
procedure.data.state(),
&CreateTablesState::CreateMetadata
));

View File

@@ -508,23 +508,20 @@ pub enum Error {
location: Location,
},
#[snafu(display(
"Do not support creating tables in multiple catalogs: {}",
catalog_names
))]
CreateTableWithMultiCatalogs {
catalog_names: String,
#[snafu(display("Do not support {} in multiple catalogs", ddl_name))]
DdlWithMultiCatalogs {
ddl_name: String,
location: Location,
},
#[snafu(display("Do not support creating tables in multiple schemas: {}", schema_names))]
CreateTableWithMultiSchemas {
schema_names: String,
#[snafu(display("Do not support {} in multiple schemas", ddl_name))]
DdlWithMultiSchemas {
ddl_name: String,
location: Location,
},
#[snafu(display("Empty creating table expr"))]
EmptyCreateTableExpr { location: Location },
#[snafu(display("Empty {} expr", name))]
EmptyDdlExpr { name: String, location: Location },
#[snafu(display("Failed to create logical tables: {}", reason))]
CreateLogicalTables { reason: String, location: Location },
@@ -650,9 +647,9 @@ impl ErrorExt for Error {
Error::ColumnDefaultValue { source, .. } => source.status_code(),
Error::CreateTableWithMultiCatalogs { .. }
| Error::CreateTableWithMultiSchemas { .. }
| Error::EmptyCreateTableExpr { .. }
Error::DdlWithMultiCatalogs { .. }
| Error::DdlWithMultiSchemas { .. }
| Error::EmptyDdlExpr { .. }
| Error::InvalidPartitionRule { .. }
| Error::ParseSqlValue { .. } => StatusCode::InvalidArguments,

View File

@@ -256,6 +256,7 @@ impl Inserter {
statement_executor: &StatementExecutor,
) -> Result<()> {
let mut create_tables = vec![];
let mut alter_tables = vec![];
for req in &requests.inserts {
let catalog = ctx.current_catalog();
let schema = ctx.current_schema();
@@ -264,16 +265,19 @@ impl Inserter {
Some(table) => {
// TODO(jeremy): alter in batch? (from `handle_metric_row_inserts`)
validate_request_with_table(req, &table)?;
self.alter_table_on_demand(req, table, ctx, statement_executor)
.await?
let alter_expr = self.get_alter_table_expr_on_demand(req, table, ctx)?;
if let Some(alter_expr) = alter_expr {
alter_tables.push(alter_expr);
}
}
None => {
create_tables.push(req);
}
}
}
if !create_tables.is_empty() {
if let Some(on_physical_table) = on_physical_table {
if let Some(on_physical_table) = on_physical_table {
if !create_tables.is_empty() {
// Creates logical tables in batch.
self.create_logical_tables(
create_tables,
@@ -282,10 +286,19 @@ impl Inserter {
statement_executor,
)
.await?;
} else {
for req in create_tables {
self.create_table(req, ctx, statement_executor).await?;
}
}
if !alter_tables.is_empty() {
// Alter logical tables in batch.
statement_executor
.alter_logical_tables(alter_tables)
.await?;
}
} else {
for req in create_tables {
self.create_table(req, ctx, statement_executor).await?;
}
for alter_expr in alter_tables.into_iter() {
statement_executor.alter_table_inner(alter_expr).await?;
}
}
@@ -364,13 +377,12 @@ impl Inserter {
.context(CatalogSnafu)
}
async fn alter_table_on_demand(
fn get_alter_table_expr_on_demand(
&self,
req: &RowInsertRequest,
table: TableRef,
ctx: &QueryContextRef,
statement_executor: &StatementExecutor,
) -> Result<()> {
) -> Result<Option<AlterExpr>> {
let catalog_name = ctx.current_catalog();
let schema_name = ctx.current_schema();
let table_name = table.table_info().name.clone();
@@ -380,39 +392,15 @@ impl Inserter {
let add_columns = extract_new_columns(&table.schema(), column_exprs)
.context(FindNewColumnsOnInsertionSnafu)?;
let Some(add_columns) = add_columns else {
return Ok(());
return Ok(None);
};
info!(
"Adding new columns: {:?} to table: {}.{}.{}",
add_columns, catalog_name, schema_name, table_name
);
let alter_table_expr = AlterExpr {
Ok(Some(AlterExpr {
catalog_name: catalog_name.to_string(),
schema_name: schema_name.to_string(),
table_name: table_name.to_string(),
kind: Some(Kind::AddColumns(add_columns)),
};
let res = statement_executor.alter_table_inner(alter_table_expr).await;
match res {
Ok(_) => {
info!(
"Successfully added new columns to table: {}.{}.{}",
catalog_name, schema_name, table_name
);
Ok(())
}
Err(err) => {
error!(
"Failed to add new columns to table: {}.{}.{}: {}",
catalog_name, schema_name, table_name, err
);
Err(err)
}
}
}))
}
/// Create a table with schema from insert request.

View File

@@ -26,6 +26,11 @@ lazy_static! {
"table operator create table"
)
.unwrap();
pub static ref DIST_ALTER_TABLES: Histogram = register_histogram!(
"greptime_table_operator_alter_tables",
"table operator alter table"
)
.unwrap();
pub static ref DIST_INGEST_ROW_COUNT: IntCounter = register_int_counter!(
"greptime_table_operator_ingest_rows",
"table operator ingest rows"

View File

@@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::{HashMap, HashSet};
use std::collections::HashMap;
use std::sync::Arc;
use api::helper::ColumnDataTypeWrapper;
@@ -57,8 +57,8 @@ use table::TableRef;
use super::StatementExecutor;
use crate::error::{
self, AlterExprToRequestSnafu, CatalogSnafu, ColumnDataTypeSnafu, ColumnNotFoundSnafu,
CreateLogicalTablesSnafu, CreateTableInfoSnafu, CreateTableWithMultiCatalogsSnafu,
CreateTableWithMultiSchemasSnafu, DeserializePartitionSnafu, EmptyCreateTableExprSnafu,
CreateLogicalTablesSnafu, CreateTableInfoSnafu, DdlWithMultiCatalogsSnafu,
DdlWithMultiSchemasSnafu, DeserializePartitionSnafu, EmptyDdlExprSnafu,
InvalidPartitionColumnsSnafu, InvalidPartitionRuleSnafu, InvalidTableNameSnafu,
ParseSqlValueSnafu, Result, SchemaNotFoundSnafu, TableAlreadyExistsSnafu,
TableMetadataManagerSnafu, TableNotFoundSnafu, UnrecognizedTableOptionSnafu,
@@ -242,20 +242,18 @@ impl StatementExecutor {
create_table_exprs: &[CreateTableExpr],
) -> Result<Vec<TableRef>> {
let _timer = crate::metrics::DIST_CREATE_TABLES.start_timer();
ensure!(!create_table_exprs.is_empty(), EmptyCreateTableExprSnafu);
ensure!(
!create_table_exprs.is_empty(),
EmptyDdlExprSnafu {
name: "create table"
}
);
ensure!(
create_table_exprs
.windows(2)
.all(|expr| expr[0].catalog_name == expr[1].catalog_name),
CreateTableWithMultiCatalogsSnafu {
catalog_names: create_table_exprs
.iter()
.map(|x| x.catalog_name.as_str())
.collect::<HashSet<_>>()
.into_iter()
.collect::<Vec<_>>()
.join(",")
.to_string()
DdlWithMultiCatalogsSnafu {
ddl_name: "create tables"
}
);
let catalog_name = create_table_exprs[0].catalog_name.to_string();
@@ -264,15 +262,8 @@ impl StatementExecutor {
create_table_exprs
.windows(2)
.all(|expr| expr[0].schema_name == expr[1].schema_name),
CreateTableWithMultiSchemasSnafu {
schema_names: create_table_exprs
.iter()
.map(|x| x.schema_name.as_str())
.collect::<HashSet<_>>()
.into_iter()
.collect::<Vec<_>>()
.join(",")
.to_string()
DdlWithMultiSchemasSnafu {
ddl_name: "create tables"
}
);
let schema_name = create_table_exprs[0].schema_name.to_string();
@@ -329,6 +320,38 @@ impl StatementExecutor {
.collect())
}
#[tracing::instrument(skip_all)]
pub async fn alter_logical_tables(&self, alter_table_exprs: Vec<AlterExpr>) -> Result<Output> {
let _timer = crate::metrics::DIST_ALTER_TABLES.start_timer();
ensure!(
!alter_table_exprs.is_empty(),
EmptyDdlExprSnafu {
name: "alter table"
}
);
ensure!(
alter_table_exprs
.windows(2)
.all(|expr| expr[0].catalog_name == expr[1].catalog_name),
DdlWithMultiCatalogsSnafu {
ddl_name: "alter tables",
}
);
ensure!(
alter_table_exprs
.windows(2)
.all(|expr| expr[0].schema_name == expr[1].schema_name),
DdlWithMultiSchemasSnafu {
ddl_name: "alter tables",
}
);
self.alter_logical_tables_procedure(alter_table_exprs)
.await?;
Ok(Output::new_with_affected_rows(0))
}
#[tracing::instrument(skip_all)]
pub async fn drop_table(&self, table_name: TableName, drop_if_exists: bool) -> Result<Output> {
if let Some(table) = self
@@ -443,7 +466,7 @@ impl StatementExecutor {
let _ = table_info
.meta
.builder_with_alter_kind(table_name, &request.alter_kind)
.builder_with_alter_kind(table_name, &request.alter_kind, false)
.context(error::TableSnafu)?
.build()
.context(error::BuildTableMetaSnafu { table_name })?;
@@ -551,6 +574,20 @@ impl StatementExecutor {
.context(error::ExecuteDdlSnafu)
}
async fn alter_logical_tables_procedure(
&self,
tables_data: Vec<AlterExpr>,
) -> Result<SubmitDdlTaskResponse> {
let request = SubmitDdlTaskRequest {
task: DdlTask::new_alter_logical_tables(tables_data),
};
self.procedure_executor
.submit_ddl_task(&ExecutorContext::default(), request)
.await
.context(error::ExecuteDdlSnafu)
}
async fn drop_table_procedure(
&self,
table_name: &TableName,

View File

@@ -188,9 +188,12 @@ impl TableMeta {
&self,
table_name: &str,
alter_kind: &AlterKind,
add_if_not_exists: bool,
) -> Result<TableMetaBuilder> {
match alter_kind {
AlterKind::AddColumns { columns } => self.add_columns(table_name, columns),
AlterKind::AddColumns { columns } => {
self.add_columns(table_name, columns, add_if_not_exists)
}
AlterKind::DropColumns { names } => self.remove_columns(table_name, names),
// No need to rebuild table meta when renaming tables.
AlterKind::RenameTable { .. } => {
@@ -248,6 +251,7 @@ impl TableMeta {
&self,
table_name: &str,
requests: &[AddColumnRequest],
add_if_not_exists: bool,
) -> Result<TableMetaBuilder> {
let table_schema = &self.schema;
let mut meta_builder = self.new_meta_builder();
@@ -255,7 +259,31 @@ impl TableMeta {
self.primary_key_indices.iter().collect();
let mut names = HashSet::with_capacity(requests.len());
let mut new_requests = Vec::with_capacity(requests.len());
let requests = if add_if_not_exists {
for col_to_add in requests {
if let Some(column_schema) =
table_schema.column_schema_by_name(&col_to_add.column_schema.name)
{
// If the column already exists, we should check if the type is the same.
ensure!(
column_schema.data_type == col_to_add.column_schema.data_type,
error::InvalidAlterRequestSnafu {
table: table_name,
err: format!(
"column {} already exists with different type",
col_to_add.column_schema.name
),
}
);
} else {
new_requests.push(col_to_add.clone());
}
}
&new_requests[..]
} else {
requests
};
for col_to_add in requests {
ensure!(
names.insert(&col_to_add.column_schema.name),
@@ -630,6 +658,44 @@ pub struct RawTableInfo {
pub table_type: TableType,
}
impl RawTableInfo {
/// Sort the columns in [RawTableInfo], logical tables require it.
pub fn sort_columns(&mut self) {
let column_schemas = &self.meta.schema.column_schemas;
let primary_keys = self
.meta
.primary_key_indices
.iter()
.map(|index| column_schemas[*index].name.clone())
.collect::<HashSet<_>>();
self.meta
.schema
.column_schemas
.sort_unstable_by(|a, b| a.name.cmp(&b.name));
// Compute new indices of sorted columns
let mut primary_key_indices = Vec::with_capacity(primary_keys.len());
let mut timestamp_index = None;
let mut value_indices =
Vec::with_capacity(self.meta.schema.column_schemas.len() - primary_keys.len() - 1);
for (index, column_schema) in self.meta.schema.column_schemas.iter().enumerate() {
if primary_keys.contains(&column_schema.name) {
primary_key_indices.push(index);
} else if column_schema.is_time_index() {
timestamp_index = Some(index);
} else {
value_indices.push(index);
}
}
// Overwrite table meta
self.meta.schema.timestamp_index = timestamp_index;
self.meta.primary_key_indices = primary_key_indices;
self.meta.value_indices = value_indices;
}
}
impl From<TableInfo> for RawTableInfo {
fn from(info: TableInfo) -> RawTableInfo {
RawTableInfo {
@@ -731,7 +797,7 @@ mod tests {
};
let builder = meta
.builder_with_alter_kind("my_table", &alter_kind)
.builder_with_alter_kind("my_table", &alter_kind, false)
.unwrap();
builder.build().unwrap()
}
@@ -761,7 +827,7 @@ mod tests {
};
let builder = meta
.builder_with_alter_kind("my_table", &alter_kind)
.builder_with_alter_kind("my_table", &alter_kind, false)
.unwrap();
builder.build().unwrap()
}
@@ -808,7 +874,7 @@ mod tests {
names: vec![String::from("col2"), String::from("my_field")],
};
let new_meta = meta
.builder_with_alter_kind("my_table", &alter_kind)
.builder_with_alter_kind("my_table", &alter_kind, false)
.unwrap()
.build()
.unwrap();
@@ -863,7 +929,7 @@ mod tests {
names: vec![String::from("col3"), String::from("col1")],
};
let new_meta = meta
.builder_with_alter_kind("my_table", &alter_kind)
.builder_with_alter_kind("my_table", &alter_kind, false)
.unwrap()
.build()
.unwrap();
@@ -903,7 +969,7 @@ mod tests {
};
let err = meta
.builder_with_alter_kind("my_table", &alter_kind)
.builder_with_alter_kind("my_table", &alter_kind, false)
.err()
.unwrap();
assert_eq!(StatusCode::TableColumnExists, err.status_code());
@@ -933,7 +999,7 @@ mod tests {
};
let err = meta
.builder_with_alter_kind("my_table", &alter_kind)
.builder_with_alter_kind("my_table", &alter_kind, false)
.err()
.unwrap();
assert_eq!(StatusCode::InvalidArguments, err.status_code());
@@ -955,7 +1021,7 @@ mod tests {
};
let err = meta
.builder_with_alter_kind("my_table", &alter_kind)
.builder_with_alter_kind("my_table", &alter_kind, false)
.err()
.unwrap();
assert_eq!(StatusCode::TableColumnNotFound, err.status_code());
@@ -978,7 +1044,7 @@ mod tests {
};
let err = meta
.builder_with_alter_kind("my_table", &alter_kind)
.builder_with_alter_kind("my_table", &alter_kind, false)
.err()
.unwrap();
assert_eq!(StatusCode::InvalidArguments, err.status_code());
@@ -989,7 +1055,7 @@ mod tests {
};
let err = meta
.builder_with_alter_kind("my_table", &alter_kind)
.builder_with_alter_kind("my_table", &alter_kind, false)
.err()
.unwrap();
assert_eq!(StatusCode::InvalidArguments, err.status_code());