refactor: alter logical tables (#3618)

* refactor: on prepare

* refactor: on create regions

* refactor: update metadata
This commit is contained in:
JeremyHi
2024-04-02 14:21:34 +08:00
committed by GitHub
parent 6c316d268f
commit a61fb98e4a
9 changed files with 462 additions and 328 deletions

View File

@@ -23,7 +23,6 @@ use common_procedure::error::{FromJsonSnafu, Result as ProcedureResult, ToJsonSn
use common_procedure::{Context, LockKey, Procedure, Status};
use common_telemetry::{info, warn};
use futures_util::future;
use itertools::Itertools;
use serde::{Deserialize, Serialize};
use snafu::{ensure, ResultExt};
use store_api::metadata::ColumnMetadata;
@@ -32,11 +31,10 @@ use strum::AsRefStr;
use table::metadata::TableId;
use crate::ddl::utils::add_peer_context_if_needed;
use crate::ddl::{physical_table_metadata, DdlContext};
use crate::ddl::DdlContext;
use crate::error::{DecodeJsonSnafu, Error, MetadataCorruptionSnafu, Result};
use crate::key::table_info::TableInfoValue;
use crate::key::table_route::PhysicalTableRouteValue;
use crate::key::DeserializedValueWithBytes;
use crate::lock_key::{CatalogLock, SchemaLock, TableLock};
use crate::rpc::ddl::AlterTableTask;
use crate::rpc::router::find_leaders;
@@ -128,7 +126,7 @@ impl AlterLogicalTablesProcedure {
});
}
// Collects responses from all the alter region tasks.
// Collects responses from datanodes.
let phy_raw_schemas = future::join_all(alter_region_tasks)
.await
.into_iter()
@@ -163,44 +161,8 @@ impl AlterLogicalTablesProcedure {
}
pub(crate) async fn on_update_metadata(&mut self) -> Result<Status> {
if !self.data.physical_columns.is_empty() {
let physical_table_info = self.data.physical_table_info.as_ref().unwrap();
// Generates new table info
let old_raw_table_info = physical_table_info.table_info.clone();
let new_raw_table_info = physical_table_metadata::build_new_physical_table_info(
old_raw_table_info,
&self.data.physical_columns,
);
// Updates physical table's metadata
self.context
.table_metadata_manager
.update_table_info(
DeserializedValueWithBytes::from_inner(physical_table_info.clone()),
new_raw_table_info,
)
.await?;
}
let table_info_values = self.build_update_metadata()?;
let manager = &self.context.table_metadata_manager;
let chunk_size = manager.batch_update_table_info_value_chunk_size();
if table_info_values.len() > chunk_size {
let chunks = table_info_values
.into_iter()
.chunks(chunk_size)
.into_iter()
.map(|check| check.collect::<Vec<_>>())
.collect::<Vec<_>>();
for chunk in chunks {
manager.batch_update_table_info_values(chunk).await?;
}
} else {
manager
.batch_update_table_info_values(table_info_values)
.await?;
}
self.update_physical_table_metadata().await?;
self.update_logical_tables_metadata().await?;
self.data.state = AlterTablesState::InvalidateTableCache;
Ok(Status::executing(true))

View File

@@ -13,16 +13,70 @@
// limitations under the License.
use common_grpc_expr::alter_expr_to_request;
use common_telemetry::warn;
use itertools::Itertools;
use snafu::ResultExt;
use table::metadata::{RawTableInfo, TableInfo};
use crate::ddl::alter_logical_tables::AlterLogicalTablesProcedure;
use crate::ddl::physical_table_metadata;
use crate::error;
use crate::error::{ConvertAlterTableRequestSnafu, Result};
use crate::key::table_info::TableInfoValue;
use crate::key::DeserializedValueWithBytes;
use crate::rpc::ddl::AlterTableTask;
impl AlterLogicalTablesProcedure {
pub(crate) async fn update_physical_table_metadata(&mut self) -> Result<()> {
if self.data.physical_columns.is_empty() {
warn!("No physical columns found, leaving the physical table's schema unchanged when altering logical tables");
return Ok(());
}
let physical_table_info = self.data.physical_table_info.as_ref().unwrap();
// Generates new table info
let old_raw_table_info = physical_table_info.table_info.clone();
let new_raw_table_info = physical_table_metadata::build_new_physical_table_info(
old_raw_table_info,
&self.data.physical_columns,
);
// Updates physical table's metadata
self.context
.table_metadata_manager
.update_table_info(
DeserializedValueWithBytes::from_inner(physical_table_info.clone()),
new_raw_table_info,
)
.await?;
Ok(())
}
pub(crate) async fn update_logical_tables_metadata(&mut self) -> Result<()> {
let table_info_values = self.build_update_metadata()?;
let manager = &self.context.table_metadata_manager;
let chunk_size = manager.batch_update_table_info_value_chunk_size();
if table_info_values.len() > chunk_size {
let chunks = table_info_values
.into_iter()
.chunks(chunk_size)
.into_iter()
.map(|check| check.collect::<Vec<_>>())
.collect::<Vec<_>>();
for chunk in chunks {
manager.batch_update_table_info_values(chunk).await?;
}
} else {
manager
.batch_update_table_info_values(table_info_values)
.await?;
}
Ok(())
}
pub(crate) fn build_update_metadata(&self) -> Result<Vec<(TableInfoValue, RawTableInfo)>> {
let mut table_info_values_to_update = Vec::with_capacity(self.data.tasks.len());
for (task, table) in self

View File

@@ -12,45 +12,32 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashMap;
use std::ops::Deref;
mod check;
mod metadata;
mod region_request;
mod update_metadata;
use api::v1::region::region_request::Body as PbRegionRequest;
use api::v1::region::{CreateRequests, RegionRequest, RegionRequestHeader};
use api::v1::CreateTableExpr;
use async_trait::async_trait;
use common_procedure::error::{FromJsonSnafu, Result as ProcedureResult, ToJsonSnafu};
use common_procedure::{Context as ProcedureContext, LockKey, Procedure, Status};
use common_telemetry::tracing_context::TracingContext;
use common_telemetry::{info, warn};
use common_telemetry::warn;
use futures_util::future::join_all;
use itertools::Itertools;
use serde::{Deserialize, Serialize};
use snafu::{ensure, OptionExt, ResultExt};
use snafu::{ensure, ResultExt};
use store_api::metadata::ColumnMetadata;
use store_api::metric_engine_consts::ALTER_PHYSICAL_EXTENSION_KEY;
use store_api::storage::{RegionId, RegionNumber};
use strum::AsRefStr;
use table::metadata::{RawTableInfo, TableId};
use crate::cache_invalidator::Context;
use crate::ddl::create_table_template::{build_template, CreateRequestBuilder};
use crate::ddl::utils::{add_peer_context_if_needed, handle_retry_error, region_storage_path};
use crate::ddl::{physical_table_metadata, DdlContext};
use crate::error::{
DecodeJsonSnafu, MetadataCorruptionSnafu, Result, TableAlreadyExistsSnafu,
TableInfoNotFoundSnafu,
};
use crate::instruction::CacheIdent;
use crate::key::table_info::TableInfoValue;
use crate::key::table_name::TableNameKey;
use crate::ddl::utils::{add_peer_context_if_needed, handle_retry_error};
use crate::ddl::DdlContext;
use crate::error::{DecodeJsonSnafu, MetadataCorruptionSnafu, Result};
use crate::key::table_route::TableRouteValue;
use crate::key::DeserializedValueWithBytes;
use crate::lock_key::{CatalogLock, SchemaLock, TableLock, TableNameLock};
use crate::peer::Peer;
use crate::rpc::ddl::CreateTableTask;
use crate::rpc::router::{find_leader_regions, find_leaders, RegionRoute};
use crate::table_name::TableName;
use crate::rpc::router::{find_leaders, RegionRoute};
use crate::{metrics, ClusterId};
pub struct CreateLogicalTablesProcedure {
@@ -67,17 +54,18 @@ impl CreateLogicalTablesProcedure {
physical_table_id: TableId,
context: DdlContext,
) -> Self {
let len = tasks.len();
let data = CreateTablesData {
cluster_id,
state: CreateTablesState::Prepare,
tasks,
table_ids_already_exists: vec![None; len],
physical_table_id,
physical_region_numbers: vec![],
physical_columns: vec![],
};
Self { context, data }
Self {
context,
data: CreateTablesData {
cluster_id,
state: CreateTablesState::Prepare,
tasks,
table_ids_already_exists: vec![],
physical_table_id,
physical_region_numbers: vec![],
physical_columns: vec![],
},
}
}
pub fn from_json(json: &str, context: DdlContext) -> ProcedureResult<Self> {
@@ -96,91 +84,45 @@ impl CreateLogicalTablesProcedure {
/// - Failed to check whether tables exist.
/// - One of logical tables has existing, and the table creation task without setting `create_if_not_exists`.
pub(crate) async fn on_prepare(&mut self) -> Result<Status> {
let manager = &self.context.table_metadata_manager;
self.check_input_tasks()?;
// Sets physical region numbers
let physical_table_id = self.data.physical_table_id();
let physical_region_numbers = manager
.table_route_manager()
.get_physical_table_route(physical_table_id)
.await
.map(|(_, route)| TableRouteValue::Physical(route).region_numbers())?;
self.data
.set_physical_region_numbers(physical_region_numbers);
self.fill_physical_table_info().await?;
// Checks if the tables exist
let table_name_keys = self
.data
.all_create_table_exprs()
.iter()
.map(|expr| TableNameKey::new(&expr.catalog_name, &expr.schema_name, &expr.table_name))
.collect::<Vec<_>>();
let already_exists_tables_ids = manager
.table_name_manager()
.batch_get(table_name_keys)
.await?
.iter()
.map(|x| x.map(|x| x.table_id()))
.collect::<Vec<_>>();
// Validates the tasks
let tasks = &mut self.data.tasks;
for (task, table_id) in tasks.iter().zip(already_exists_tables_ids.iter()) {
if table_id.is_some() {
// If a table already exists, we just ignore it.
ensure!(
task.create_table.create_if_not_exists,
TableAlreadyExistsSnafu {
table_name: task.create_table.table_name.to_string(),
}
);
continue;
}
}
self.check_tables_already_exist().await?;
// If all tables already exist, returns the table_ids.
if already_exists_tables_ids.iter().all(Option::is_some) {
if self
.data
.table_ids_already_exists
.iter()
.all(Option::is_some)
{
return Ok(Status::done_with_output(
already_exists_tables_ids
.into_iter()
self.data
.table_ids_already_exists
.drain(..)
.flatten()
.collect::<Vec<_>>(),
));
}
// Allocates table ids and sort columns on their names.
for (task, table_id) in tasks.iter_mut().zip(already_exists_tables_ids.iter()) {
let table_id = if let Some(table_id) = table_id {
*table_id
} else {
self.context
.table_metadata_allocator
.allocate_table_id(task)
.await?
};
task.set_table_id(table_id);
self.allocate_table_ids().await?;
// sort columns in task
task.sort_columns();
}
self.data
.set_table_ids_already_exists(already_exists_tables_ids);
self.data.state = CreateTablesState::DatanodeCreateRegions;
Ok(Status::executing(true))
}
pub async fn on_datanode_create_regions(&mut self) -> Result<Status> {
let physical_table_id = self.data.physical_table_id();
let (_, physical_table_route) = self
.context
.table_metadata_manager
.table_route_manager()
.get_physical_table_route(physical_table_id)
.get_physical_table_route(self.data.physical_table_id)
.await?;
let region_routes = &physical_table_route.region_routes;
self.create_regions(region_routes).await
self.create_regions(&physical_table_route.region_routes)
.await
}
/// Creates table metadata for logical tables and update corresponding physical
@@ -189,179 +131,54 @@ impl CreateLogicalTablesProcedure {
/// Abort(not-retry):
/// - Failed to create table metadata.
pub async fn on_create_metadata(&mut self) -> Result<Status> {
let manager = &self.context.table_metadata_manager;
let physical_table_id = self.data.physical_table_id();
let remaining_tasks = self.data.remaining_tasks();
let num_tables = remaining_tasks.len();
if num_tables > 0 {
let chunk_size = manager.create_logical_tables_metadata_chunk_size();
if num_tables > chunk_size {
let chunks = remaining_tasks
.into_iter()
.chunks(chunk_size)
.into_iter()
.map(|chunk| chunk.collect::<Vec<_>>())
.collect::<Vec<_>>();
for chunk in chunks {
manager.create_logical_tables_metadata(chunk).await?;
}
} else {
manager
.create_logical_tables_metadata(remaining_tasks)
.await?;
}
}
// The `table_id` MUST be collected after the [Prepare::Prepare],
// ensures the all `table_id`s have been allocated.
let table_ids = self
.data
.tasks
.iter()
.map(|task| task.table_info.ident.table_id)
.collect::<Vec<_>>();
if !self.data.physical_columns.is_empty() {
// fetch old physical table's info
let physical_table_info = self
.context
.table_metadata_manager
.table_info_manager()
.get(self.data.physical_table_id)
.await?
.with_context(|| TableInfoNotFoundSnafu {
table: format!("table id - {}", self.data.physical_table_id),
})?;
// generate new table info
let new_table_info = self
.data
.build_new_physical_table_info(&physical_table_info);
let physical_table_name = TableName::new(
&new_table_info.catalog_name,
&new_table_info.schema_name,
&new_table_info.name,
);
// update physical table's metadata
self.context
.table_metadata_manager
.update_table_info(physical_table_info, new_table_info)
.await?;
// invalid table cache
self.context
.cache_invalidator
.invalidate(
&Context::default(),
vec![
CacheIdent::TableId(self.data.physical_table_id),
CacheIdent::TableName(physical_table_name),
],
)
.await?;
} else {
warn!("No physical columns found, leaving the physical table's schema unchanged");
}
info!("Created {num_tables} tables {table_ids:?} metadata for physical table {physical_table_id}");
self.update_physical_table_metadata().await?;
let table_ids = self.create_logical_tables_metadata().await?;
Ok(Status::done_with_output(table_ids))
}
fn create_region_request_builder(
&self,
physical_table_id: TableId,
task: &CreateTableTask,
) -> Result<CreateRequestBuilder> {
let create_expr = &task.create_table;
let template = build_template(create_expr)?;
Ok(CreateRequestBuilder::new(template, Some(physical_table_id)))
}
fn one_datanode_region_requests(
&self,
datanode: &Peer,
region_routes: &[RegionRoute],
) -> Result<CreateRequests> {
let create_tables_data = &self.data;
let tasks = &create_tables_data.tasks;
let physical_table_id = create_tables_data.physical_table_id();
let regions = find_leader_regions(region_routes, datanode);
let mut requests = Vec::with_capacity(tasks.len() * regions.len());
for task in tasks {
let create_table_expr = &task.create_table;
let catalog = &create_table_expr.catalog_name;
let schema = &create_table_expr.schema_name;
let logical_table_id = task.table_info.ident.table_id;
let storage_path = region_storage_path(catalog, schema);
let request_builder = self.create_region_request_builder(physical_table_id, task)?;
for region_number in &regions {
let region_id = RegionId::new(logical_table_id, *region_number);
let create_region_request =
request_builder.build_one(region_id, storage_path.clone(), &HashMap::new())?;
requests.push(create_region_request);
}
}
Ok(CreateRequests { requests })
}
async fn create_regions(&mut self, region_routes: &[RegionRoute]) -> Result<Status> {
let leaders = find_leaders(region_routes);
let mut create_region_tasks = Vec::with_capacity(leaders.len());
for datanode in leaders {
let requester = self.context.datanode_manager.datanode(&datanode).await;
let creates = self.one_datanode_region_requests(&datanode, region_routes)?;
let request = RegionRequest {
header: Some(RegionRequestHeader {
tracing_context: TracingContext::from_current_span().to_w3c(),
..Default::default()
}),
body: Some(PbRegionRequest::Creates(creates)),
};
for peer in leaders {
let requester = self.context.datanode_manager.datanode(&peer).await;
let request = self.make_request(&peer, region_routes)?;
create_region_tasks.push(async move {
requester
.handle(request)
.await
.map_err(add_peer_context_if_needed(datanode))
.map_err(add_peer_context_if_needed(peer))
});
}
// collect response from datanodes
let raw_schemas = join_all(create_region_tasks)
// Collects response from datanodes.
let phy_raw_schemas = join_all(create_region_tasks)
.await
.into_iter()
.map(|response| {
response.map(|mut response| response.extension.remove(ALTER_PHYSICAL_EXTENSION_KEY))
})
.map(|res| res.map(|mut res| res.extension.remove(ALTER_PHYSICAL_EXTENSION_KEY)))
.collect::<Result<Vec<_>>>()?;
if raw_schemas.is_empty() {
if phy_raw_schemas.is_empty() {
self.data.state = CreateTablesState::CreateMetadata;
return Ok(Status::executing(false));
}
// verify all datanodes return the same raw schemas
// Safety: previous check ensures this vector is not empty.
let first = raw_schemas.first().unwrap();
// Verify all the physical schemas are the same
// Safety: previous check ensures this vec is not empty
let first = phy_raw_schemas.first().unwrap();
ensure!(
raw_schemas.iter().all(|x| x == first),
phy_raw_schemas.iter().all(|x| x == first),
MetadataCorruptionSnafu {
err_msg: "Raw schemas from datanodes are not the same"
err_msg: "The physical schemas from datanodes are not the same."
}
);
// decode raw schemas and store it
if let Some(raw_schema) = first {
let physical_columns =
ColumnMetadata::decode_list(raw_schema).context(DecodeJsonSnafu)?;
self.data.physical_columns = physical_columns;
// Decodes the physical raw schemas
if let Some(phy_raw_schemas) = first {
self.data.physical_columns =
ColumnMetadata::decode_list(phy_raw_schemas).context(DecodeJsonSnafu)?;
} else {
warn!("creating logical table result doesn't contains extension key `{ALTER_PHYSICAL_EXTENSION_KEY}`,leaving the physical table's schema unchanged");
}
@@ -405,7 +222,7 @@ impl Procedure for CreateLogicalTablesProcedure {
let table_ref = self.data.tasks[0].table_ref();
lock_key.push(CatalogLock::Read(table_ref.catalog).into());
lock_key.push(SchemaLock::read(table_ref.catalog, table_ref.schema).into());
lock_key.push(TableLock::Write(self.data.physical_table_id()).into());
lock_key.push(TableLock::Write(self.data.physical_table_id).into());
for task in &self.data.tasks {
lock_key.push(
@@ -437,18 +254,6 @@ impl CreateTablesData {
&self.state
}
fn physical_table_id(&self) -> TableId {
self.physical_table_id
}
fn set_physical_region_numbers(&mut self, physical_region_numbers: Vec<RegionNumber>) {
self.physical_region_numbers = physical_region_numbers;
}
fn set_table_ids_already_exists(&mut self, table_ids_already_exists: Vec<Option<TableId>>) {
self.table_ids_already_exists = table_ids_already_exists;
}
fn all_create_table_exprs(&self) -> Vec<&CreateTableExpr> {
self.tasks
.iter()
@@ -480,21 +285,6 @@ impl CreateTablesData {
})
.collect::<Vec<_>>()
}
/// Generate the new physical table info.
///
/// This method will consumes the physical columns.
fn build_new_physical_table_info(
&mut self,
old_table_info: &DeserializedValueWithBytes<TableInfoValue>,
) -> RawTableInfo {
let raw_table_info = old_table_info.deref().table_info.clone();
physical_table_metadata::build_new_physical_table_info(
raw_table_info,
&self.physical_columns,
)
}
}
#[derive(Debug, Clone, Serialize, Deserialize, AsRefStr)]

View File

@@ -0,0 +1,81 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use snafu::ensure;
use crate::ddl::create_logical_tables::CreateLogicalTablesProcedure;
use crate::error::{CreateLogicalTablesInvalidArgumentsSnafu, Result, TableAlreadyExistsSnafu};
use crate::key::table_name::TableNameKey;
impl CreateLogicalTablesProcedure {
pub(crate) fn check_input_tasks(&self) -> Result<()> {
self.check_schema()?;
Ok(())
}
pub(crate) async fn check_tables_already_exist(&mut self) -> Result<()> {
let table_name_keys = self
.data
.all_create_table_exprs()
.iter()
.map(|expr| TableNameKey::new(&expr.catalog_name, &expr.schema_name, &expr.table_name))
.collect::<Vec<_>>();
let table_ids_already_exists = self
.context
.table_metadata_manager
.table_name_manager()
.batch_get(table_name_keys)
.await?
.iter()
.map(|x| x.map(|x| x.table_id()))
.collect::<Vec<_>>();
self.data.table_ids_already_exists = table_ids_already_exists;
// Validates the tasks
let tasks = &mut self.data.tasks;
for (task, table_id) in tasks.iter().zip(self.data.table_ids_already_exists.iter()) {
if table_id.is_some() {
// If a table already exists, we just ignore it.
ensure!(
task.create_table.create_if_not_exists,
TableAlreadyExistsSnafu {
table_name: task.create_table.table_name.to_string(),
}
);
continue;
}
}
Ok(())
}
// Checks if the schemas of the tasks are the same
fn check_schema(&self) -> Result<()> {
let is_same_schema = self.data.tasks.windows(2).all(|pair| {
pair[0].create_table.catalog_name == pair[1].create_table.catalog_name
&& pair[0].create_table.schema_name == pair[1].create_table.schema_name
});
ensure!(
is_same_schema,
CreateLogicalTablesInvalidArgumentsSnafu {
err_msg: "Schemas of the tasks are not the same"
}
);
Ok(())
}
}

View File

@@ -0,0 +1,57 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use crate::ddl::create_logical_tables::CreateLogicalTablesProcedure;
use crate::error::Result;
use crate::key::table_route::TableRouteValue;
impl CreateLogicalTablesProcedure {
pub(crate) async fn fill_physical_table_info(&mut self) -> Result<()> {
let physical_region_numbers = self
.context
.table_metadata_manager
.table_route_manager()
.get_physical_table_route(self.data.physical_table_id)
.await
.map(|(_, route)| TableRouteValue::Physical(route).region_numbers())?;
self.data.physical_region_numbers = physical_region_numbers;
Ok(())
}
pub(crate) async fn allocate_table_ids(&mut self) -> Result<()> {
for (task, table_id) in self
.data
.tasks
.iter_mut()
.zip(self.data.table_ids_already_exists.iter())
{
let table_id = if let Some(table_id) = table_id {
*table_id
} else {
self.context
.table_metadata_allocator
.allocate_table_id(task)
.await?
};
task.set_table_id(table_id);
// sort columns in task
task.sort_columns();
}
Ok(())
}
}

View File

@@ -0,0 +1,74 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashMap;
use api::v1::region::{region_request, CreateRequests, RegionRequest, RegionRequestHeader};
use common_telemetry::tracing_context::TracingContext;
use store_api::storage::RegionId;
use crate::ddl::create_logical_tables::CreateLogicalTablesProcedure;
use crate::ddl::create_table_template::{build_template, CreateRequestBuilder};
use crate::ddl::utils::region_storage_path;
use crate::error::Result;
use crate::peer::Peer;
use crate::rpc::ddl::CreateTableTask;
use crate::rpc::router::{find_leader_regions, RegionRoute};
impl CreateLogicalTablesProcedure {
pub(crate) fn make_request(
&self,
peer: &Peer,
region_routes: &[RegionRoute],
) -> Result<RegionRequest> {
let tasks = &self.data.tasks;
let regions_on_this_peer = find_leader_regions(region_routes, peer);
let mut requests = Vec::with_capacity(tasks.len() * regions_on_this_peer.len());
for task in tasks {
let create_table_expr = &task.create_table;
let catalog = &create_table_expr.catalog_name;
let schema = &create_table_expr.schema_name;
let logical_table_id = task.table_info.ident.table_id;
let storage_path = region_storage_path(catalog, schema);
let request_builder = self.create_region_request_builder(task)?;
for region_number in &regions_on_this_peer {
let region_id = RegionId::new(logical_table_id, *region_number);
let one_region_request =
request_builder.build_one(region_id, storage_path.clone(), &HashMap::new())?;
requests.push(one_region_request);
}
}
Ok(RegionRequest {
header: Some(RegionRequestHeader {
tracing_context: TracingContext::from_current_span().to_w3c(),
..Default::default()
}),
body: Some(region_request::Body::Creates(CreateRequests { requests })),
})
}
fn create_region_request_builder(
&self,
task: &CreateTableTask,
) -> Result<CreateRequestBuilder> {
let create_expr = &task.create_table;
let template = build_template(create_expr)?;
Ok(CreateRequestBuilder::new(
template,
Some(self.data.physical_table_id),
))
}
}

View File

@@ -0,0 +1,128 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::ops::Deref;
use common_telemetry::{info, warn};
use itertools::Itertools;
use snafu::OptionExt;
use table::metadata::TableId;
use crate::cache_invalidator::Context;
use crate::ddl::create_logical_tables::CreateLogicalTablesProcedure;
use crate::ddl::physical_table_metadata;
use crate::error::{Result, TableInfoNotFoundSnafu};
use crate::instruction::CacheIdent;
use crate::table_name::TableName;
impl CreateLogicalTablesProcedure {
pub(crate) async fn update_physical_table_metadata(&mut self) -> Result<()> {
if self.data.physical_columns.is_empty() {
warn!("No physical columns found, leaving the physical table's schema unchanged when creating logical tables");
return Ok(());
}
// Fetches old physical table's info
let physical_table_info = self
.context
.table_metadata_manager
.table_info_manager()
.get(self.data.physical_table_id)
.await?
.with_context(|| TableInfoNotFoundSnafu {
table: format!("table id - {}", self.data.physical_table_id),
})?;
// Generates new table info
let raw_table_info = physical_table_info.deref().table_info.clone();
let new_table_info = physical_table_metadata::build_new_physical_table_info(
raw_table_info,
&self.data.physical_columns,
);
let physical_table_name = TableName::new(
&new_table_info.catalog_name,
&new_table_info.schema_name,
&new_table_info.name,
);
// Update physical table's metadata
self.context
.table_metadata_manager
.update_table_info(physical_table_info, new_table_info)
.await?;
// Invalid physical table cache
self.context
.cache_invalidator
.invalidate(
&Context::default(),
vec![
CacheIdent::TableId(self.data.physical_table_id),
CacheIdent::TableName(physical_table_name),
],
)
.await?;
Ok(())
}
pub(crate) async fn create_logical_tables_metadata(&mut self) -> Result<Vec<TableId>> {
let remaining_tasks = self.data.remaining_tasks();
let num_tables = remaining_tasks.len();
if num_tables > 0 {
let chunk_size = self
.context
.table_metadata_manager
.create_logical_tables_metadata_chunk_size();
if num_tables > chunk_size {
let chunks = remaining_tasks
.into_iter()
.chunks(chunk_size)
.into_iter()
.map(|chunk| chunk.collect::<Vec<_>>())
.collect::<Vec<_>>();
for chunk in chunks {
self.context
.table_metadata_manager
.create_logical_tables_metadata(chunk)
.await?;
}
} else {
self.context
.table_metadata_manager
.create_logical_tables_metadata(remaining_tasks)
.await?;
}
}
// The `table_id` MUST be collected after the [Prepare::Prepare],
// ensures the all `table_id`s have been allocated.
let table_ids = self
.data
.tasks
.iter()
.map(|task| task.table_info.ident.table_id)
.collect::<Vec<_>>();
info!(
"Created {num_tables} tables {table_ids:?} metadata for physical table {}",
self.data.physical_table_id
);
Ok(table_ids)
}
}

View File

@@ -403,6 +403,9 @@ pub enum Error {
#[snafu(display("Alter logical tables invalid arguments: {}", err_msg))]
AlterLogicalTablesInvalidArguments { err_msg: String, location: Location },
#[snafu(display("Create logical tables invalid arguments: {}", err_msg))]
CreateLogicalTablesInvalidArguments { err_msg: String, location: Location },
}
pub type Result<T> = std::result::Result<T, Error>;
@@ -463,7 +466,8 @@ impl ErrorExt for Error {
| PrimaryKeyNotFound { .. }
| EmptyKey { .. }
| InvalidEngineType { .. }
| AlterLogicalTablesInvalidArguments { .. } => StatusCode::InvalidArguments,
| AlterLogicalTablesInvalidArguments { .. }
| CreateLogicalTablesInvalidArguments { .. } => StatusCode::InvalidArguments,
TableNotFound { .. } => StatusCode::TableNotFound,
TableAlreadyExists { .. } => StatusCode::TableAlreadyExists,

View File

@@ -329,22 +329,6 @@ impl StatementExecutor {
name: "alter table"
}
);
ensure!(
alter_table_exprs
.windows(2)
.all(|expr| expr[0].catalog_name == expr[1].catalog_name),
DdlWithMultiCatalogsSnafu {
ddl_name: "alter tables",
}
);
ensure!(
alter_table_exprs
.windows(2)
.all(|expr| expr[0].schema_name == expr[1].schema_name),
DdlWithMultiSchemasSnafu {
ddl_name: "alter tables",
}
);
self.alter_logical_tables_procedure(alter_table_exprs)
.await?;