feat: implement alter table procedure (#1878)

* feat: implement alter table procedure

* fix: fix uncaught error

* refactor: move fetch_table/s to table_routes.rs

* refactor: refactor error handling

* chore: apply suggestions from CR

* feat: switch to using alter table procedure

* feat: add table_version

* chore: apply suggestions from CR

* feat: introduce ddl_channel_manager

* chore: update greptime-proto
This commit is contained in:
Weny Xu
2023-07-13 11:41:46 +09:00
committed by GitHub
parent 9ff7670adf
commit 735c6390ca
29 changed files with 827 additions and 113 deletions

3
Cargo.lock generated
View File

@@ -4134,7 +4134,7 @@ checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b"
[[package]]
name = "greptime-proto"
version = "0.1.0"
source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=0f30b7fcbf950f0596f1159373947a21ff737216#0f30b7fcbf950f0596f1159373947a21ff737216"
source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=9b26cca2132df83916692cc6fc092a26a1a76f0b#9b26cca2132df83916692cc6fc092a26a1a76f0b"
dependencies = [
"prost",
"serde",
@@ -5235,6 +5235,7 @@ dependencies = [
"common-catalog",
"common-error",
"common-grpc",
"common-grpc-expr",
"common-meta",
"common-procedure",
"common-procedure-test",

View File

@@ -74,7 +74,7 @@ datafusion-substrait = { git = "https://github.com/waynexia/arrow-datafusion.git
etcd-client = "0.11"
futures = "0.3"
futures-util = "0.3"
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "0f30b7fcbf950f0596f1159373947a21ff737216" }
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "9b26cca2132df83916692cc6fc092a26a1a76f0b" }
itertools = "0.10"
lazy_static = "1.4"
parquet = "40.0"

View File

@@ -59,6 +59,8 @@ addr = "127.0.0.1:4004"
[meta_client_options]
metasrv_addrs = ["127.0.0.1:3002"]
timeout_millis = 3000
# DDL timeouts options.
ddl_timeout_millis = 10000
connect_timeout_millis = 5000
tcp_nodelay = true

View File

@@ -206,6 +206,7 @@ mod tests {
metasrv_addrs = ["127.0.0.1:3002"]
timeout_millis = 3000
connect_timeout_millis = 5000
ddl_timeout_millis= 10000
tcp_nodelay = true
[wal]
@@ -259,10 +260,12 @@ mod tests {
timeout_millis,
connect_timeout_millis,
tcp_nodelay,
ddl_timeout_millis,
} = options.meta_client_options.unwrap();
assert_eq!(vec!["127.0.0.1:3002".to_string()], metasrv_addr);
assert_eq!(5000, connect_timeout_millis);
assert_eq!(10000, ddl_timeout_millis);
assert_eq!(3000, timeout_millis);
assert!(tcp_nodelay);

View File

@@ -38,7 +38,7 @@ pub fn alter_expr_to_request(table_id: TableId, expr: AlterExpr) -> Result<Alter
let catalog_name = expr.catalog_name;
let schema_name = expr.schema_name;
let kind = expr.kind.context(MissingFieldSnafu { field: "kind" })?;
match kind {
let alter_kind = match kind {
Kind::AddColumns(add_columns) => {
let add_column_requests = add_columns
.add_columns
@@ -61,45 +61,27 @@ pub fn alter_expr_to_request(table_id: TableId, expr: AlterExpr) -> Result<Alter
})
.collect::<Result<Vec<_>>>()?;
let alter_kind = AlterKind::AddColumns {
AlterKind::AddColumns {
columns: add_column_requests,
};
let request = AlterTableRequest {
catalog_name,
schema_name,
table_name: expr.table_name,
table_id,
alter_kind,
};
Ok(request)
}
Kind::DropColumns(DropColumns { drop_columns }) => {
let alter_kind = AlterKind::DropColumns {
names: drop_columns.into_iter().map(|c| c.name).collect(),
};
let request = AlterTableRequest {
catalog_name,
schema_name,
table_name: expr.table_name,
table_id,
alter_kind,
};
Ok(request)
}
}
Kind::DropColumns(DropColumns { drop_columns }) => AlterKind::DropColumns {
names: drop_columns.into_iter().map(|c| c.name).collect(),
},
Kind::RenameTable(RenameTable { new_table_name }) => {
let alter_kind = AlterKind::RenameTable { new_table_name };
let request = AlterTableRequest {
catalog_name,
schema_name,
table_name: expr.table_name,
table_id,
alter_kind,
};
Ok(request)
AlterKind::RenameTable { new_table_name }
}
}
};
let request = AlterTableRequest {
catalog_name,
schema_name,
table_name: expr.table_name,
table_id,
alter_kind,
table_version: Some(expr.table_version),
};
Ok(request)
}
pub fn create_table_schema(expr: &CreateTableExpr, require_time_index: bool) -> Result<RawSchema> {

View File

@@ -16,11 +16,11 @@ use std::result;
use api::v1::meta::submit_ddl_task_request::Task;
use api::v1::meta::{
CreateTableTask as PbCreateTableTask, DropTableTask as PbDropTableTask, Partition,
SubmitDdlTaskRequest as PbSubmitDdlTaskRequest,
AlterTableTask as PbAlterTableTask, CreateTableTask as PbCreateTableTask,
DropTableTask as PbDropTableTask, Partition, SubmitDdlTaskRequest as PbSubmitDdlTaskRequest,
SubmitDdlTaskResponse as PbSubmitDdlTaskResponse,
};
use api::v1::{CreateTableExpr, DropTableExpr};
use api::v1::{AlterExpr, CreateTableExpr, DropTableExpr};
use prost::Message;
use serde::{Deserialize, Serialize};
use snafu::{OptionExt, ResultExt};
@@ -34,6 +34,7 @@ use crate::table_name::TableName;
pub enum DdlTask {
CreateTable(CreateTableTask),
DropTable(DropTableTask),
AlterTable(AlterTableTask),
}
impl DdlTask {
@@ -58,6 +59,10 @@ impl DdlTask {
table_id,
})
}
pub fn new_alter_table(alter_table: AlterExpr) -> Self {
DdlTask::AlterTable(AlterTableTask { alter_table })
}
}
impl TryFrom<Task> for DdlTask {
@@ -68,7 +73,7 @@ impl TryFrom<Task> for DdlTask {
Ok(DdlTask::CreateTable(create_table.try_into()?))
}
Task::DropTableTask(drop_table) => Ok(DdlTask::DropTable(drop_table.try_into()?)),
_ => todo!(),
Task::AlterTableTask(alter_table) => Ok(DdlTask::AlterTable(alter_table.try_into()?)),
}
}
}
@@ -87,7 +92,6 @@ impl TryFrom<SubmitDdlTaskRequest> for PbSubmitDdlTaskRequest {
create_table: Some(task.create_table),
partitions: task.partitions,
}),
DdlTask::DropTable(task) => Task::DropTableTask(PbDropTableTask {
drop_table: Some(DropTableExpr {
catalog_name: task.catalog,
@@ -96,7 +100,11 @@ impl TryFrom<SubmitDdlTaskRequest> for PbSubmitDdlTaskRequest {
table_id: Some(api::v1::TableId { id: task.table_id }),
}),
}),
DdlTask::AlterTable(task) => Task::AlterTableTask(PbAlterTableTask {
alter_table: Some(task.alter_table),
}),
};
Ok(Self {
header: None,
task: Some(task),
@@ -260,6 +268,72 @@ impl<'de> Deserialize<'de> for CreateTableTask {
}
}
#[derive(Debug, PartialEq)]
pub struct AlterTableTask {
pub alter_table: AlterExpr,
}
impl AlterTableTask {
pub fn table_ref(&self) -> TableReference {
TableReference {
catalog: &self.alter_table.catalog_name,
schema: &self.alter_table.schema_name,
table: &self.alter_table.table_name,
}
}
pub fn table_name(&self) -> TableName {
let table = &self.alter_table;
TableName {
catalog_name: table.catalog_name.to_string(),
schema_name: table.schema_name.to_string(),
table_name: table.table_name.to_string(),
}
}
}
impl TryFrom<PbAlterTableTask> for AlterTableTask {
type Error = error::Error;
fn try_from(pb: PbAlterTableTask) -> Result<Self> {
let alter_table = pb.alter_table.context(error::InvalidProtoMsgSnafu {
err_msg: "expected alter_table",
})?;
Ok(AlterTableTask { alter_table })
}
}
impl Serialize for AlterTableTask {
fn serialize<S>(&self, serializer: S) -> result::Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
let pb = PbAlterTableTask {
alter_table: Some(self.alter_table.clone()),
};
let buf = pb.encode_to_vec();
serializer.serialize_bytes(&buf)
}
}
impl<'de> Deserialize<'de> for AlterTableTask {
fn deserialize<D>(deserializer: D) -> result::Result<Self, D::Error>
where
D: serde::Deserializer<'de>,
{
let buf = Vec::<u8>::deserialize(deserializer)?;
let expr: PbAlterTableTask = PbAlterTableTask::decode(&*buf)
.map_err(|err| serde::de::Error::custom(err.to_string()))?;
let expr = AlterTableTask::try_from(expr)
.map_err(|err| serde::de::Error::custom(err.to_string()))?;
Ok(expr)
}
}
#[cfg(test)]
mod tests {
use std::sync::Arc;

View File

@@ -18,7 +18,7 @@ use api::v1::meta::{
CreateRequest as PbCreateRequest, DeleteRequest as PbDeleteRequest, Partition as PbPartition,
Peer as PbPeer, Region as PbRegion, RegionRoute as PbRegionRoute,
RouteRequest as PbRouteRequest, RouteResponse as PbRouteResponse, Table as PbTable,
TableRoute as PbTableRoute,
TableRoute as PbTableRoute, TableRouteValue as PbTableRouteValue,
};
use serde::{Deserialize, Serialize, Serializer};
use snafu::{OptionExt, ResultExt};
@@ -286,6 +286,18 @@ impl TableRoute {
}
}
impl TryFrom<PbTableRouteValue> for TableRoute {
type Error = error::Error;
fn try_from(pb: PbTableRouteValue) -> Result<Self> {
TableRoute::try_from_raw(
&pb.peers,
pb.table_route.context(error::InvalidProtoMsgSnafu {
err_msg: "expected table_route",
})?,
)
}
}
#[derive(Debug, Clone, Deserialize, Serialize, PartialEq)]
pub struct Table {
pub id: u64,

View File

@@ -95,6 +95,7 @@ impl SqlHandler {
table_name: table_ref.table.to_string(),
table_id,
alter_kind,
table_version: None,
})
}
}

View File

@@ -125,6 +125,7 @@ async fn test_alter_table() {
alter_kind: AlterKind::RenameTable {
new_table_name: "foo".to_string(),
},
table_version: None,
};
let unsupported = table_engine

View File

@@ -32,12 +32,6 @@ pub enum Error {
location: Location,
},
#[snafu(display("Execute the operation timeout, source: {}", source))]
Timeout {
location: Location,
source: tokio::time::error::Elapsed,
},
#[snafu(display("Failed to handle heartbeat response, source: {}", source))]
HandleHeartbeatResponse {
location: Location,
@@ -646,8 +640,7 @@ impl ErrorExt for Error {
| Error::FindRegionRoute { .. }
| Error::BuildDfLogicalPlan { .. }
| Error::BuildTableMeta { .. }
| Error::VectorToGrpcColumn { .. }
| Error::Timeout { .. } => StatusCode::Internal,
| Error::VectorToGrpcColumn { .. } => StatusCode::Internal,
Error::IncompleteGrpcResult { .. }
| Error::ContextValueNotFound { .. }

View File

@@ -235,15 +235,23 @@ impl Instance {
))
.tcp_nodelay(meta_client_options.tcp_nodelay);
let ddl_channel_config = channel_config.clone().timeout(Duration::from_millis(
meta_client_options.ddl_timeout_millis,
));
let channel_manager = ChannelManager::with_config(channel_config);
channel_manager.start_channel_recycle();
let ddl_channel_manager = ChannelManager::with_config(ddl_channel_config);
ddl_channel_manager.start_channel_recycle();
let mut meta_client = MetaClientBuilder::new(0, 0, Role::Frontend)
.enable_router()
.enable_store()
.enable_heartbeat()
.enable_ddl()
.channel_manager(channel_manager)
.ddl_channel_manager(ddl_channel_manager)
.build();
meta_client
.start(&meta_client_options.metasrv_addrs)

View File

@@ -16,7 +16,6 @@ pub(crate) mod inserter;
use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;
use api::helper::ColumnDataTypeWrapper;
use api::v1::ddl_request::{Expr as DdlExpr, Expr};
@@ -60,11 +59,9 @@ use sql::statements::statement::Statement;
use sql::statements::{self, sql_value_to_value};
use store_api::storage::RegionNumber;
use table::engine::TableReference;
use table::metadata::{RawTableInfo, RawTableMeta, TableId, TableIdent, TableType};
use table::requests::TableOptions;
use table::table::AlterContext;
use table::metadata::{RawTableInfo, RawTableMeta, TableId, TableIdent, TableInfo, TableType};
use table::requests::{AlterTableRequest, TableOptions};
use table::TableRef;
use tokio::time::timeout;
use crate::catalog::FrontendCatalogManager;
use crate::error::{
@@ -458,7 +455,29 @@ impl DistInstance {
Ok(Output::AffectedRows(1))
}
async fn handle_alter_table(&self, expr: AlterExpr) -> Result<Output> {
fn verify_alter(
&self,
table_id: TableId,
table_info: Arc<TableInfo>,
expr: AlterExpr,
) -> Result<()> {
let request: table::requests::AlterTableRequest =
common_grpc_expr::alter_expr_to_request(table_id, expr)
.context(AlterExprToRequestSnafu)?;
let AlterTableRequest { table_name, .. } = &request;
let _ = table_info
.meta
.builder_with_alter_kind(table_name, &request.alter_kind)
.context(error::TableSnafu)?
.build()
.context(error::BuildTableMetaSnafu { table_name })?;
Ok(())
}
async fn handle_alter_table(&self, mut expr: AlterExpr) -> Result<Output> {
let catalog_name = if expr.catalog_name.is_empty() {
DEFAULT_CATALOG_NAME
} else {
@@ -482,17 +501,19 @@ impl DistInstance {
table_name: format_full_table_name(catalog_name, schema_name, table_name),
})?;
let request = common_grpc_expr::alter_expr_to_request(
table.table_info().ident.table_id,
expr.clone(),
)
.context(AlterExprToRequestSnafu)?;
let table_id = table.table_info().ident.table_id;
expr.table_id = Some(api::v1::TableId { id: table_id });
let mut context = AlterContext::with_capacity(1);
self.verify_alter(table_id, table.table_info(), expr.clone())?;
let _ = context.insert(expr);
let req = SubmitDdlTaskRequest {
task: DdlTask::new_alter_table(expr.clone()),
};
table.alter(context, &request).await.context(TableSnafu)?;
self.meta_client
.submit_ddl_task(req)
.await
.context(error::RequestMetaSnafu)?;
Ok(Output::AffectedRows(0))
}
@@ -510,14 +531,10 @@ impl DistInstance {
task: DdlTask::new_create_table(create_table.clone(), partitions, table_info),
};
timeout(
// TODO(weny): makes timeout configurable.
Duration::from_secs(10),
self.meta_client.submit_ddl_task(request),
)
.await
.context(error::TimeoutSnafu)?
.context(error::RequestMetaSnafu)
self.meta_client
.submit_ddl_task(request)
.await
.context(error::RequestMetaSnafu)
}
async fn drop_table_procedure(
@@ -534,14 +551,10 @@ impl DistInstance {
),
};
timeout(
// TODO(weny): makes timeout configurable.
Duration::from_secs(10),
self.meta_client.submit_ddl_task(request),
)
.await
.context(error::TimeoutSnafu)?
.context(error::RequestMetaSnafu)
self.meta_client
.submit_ddl_task(request)
.await
.context(error::RequestMetaSnafu)
}
async fn handle_dist_insert(

View File

@@ -349,6 +349,7 @@ impl DistTable {
table_name,
alter_kind,
table_id: _table_id,
..
} = request;
let alter_expr = context

View File

@@ -54,6 +54,7 @@ pub struct MetaClientBuilder {
enable_lock: bool,
enable_ddl: bool,
channel_manager: Option<ChannelManager>,
ddl_channel_manager: Option<ChannelManager>,
}
impl MetaClientBuilder {
@@ -107,6 +108,13 @@ impl MetaClientBuilder {
}
}
pub fn ddl_channel_manager(self, channel_manager: ChannelManager) -> Self {
Self {
ddl_channel_manager: Some(channel_manager),
..self
}
}
pub fn build(self) -> MetaClient {
let mut client = if let Some(mgr) = self.channel_manager {
MetaClient::with_channel_manager(self.id, mgr)
@@ -133,6 +141,7 @@ impl MetaClientBuilder {
client.lock = Some(LockClient::new(self.id, self.role, mgr.clone()));
}
if self.enable_ddl {
let mgr = self.ddl_channel_manager.unwrap_or(mgr);
client.ddl = Some(DdlClient::new(self.id, self.role, mgr));
}

View File

@@ -22,15 +22,22 @@ pub mod error;
pub struct MetaClientOptions {
pub metasrv_addrs: Vec<String>,
pub timeout_millis: u64,
#[serde(default = "default_ddl_timeout_millis")]
pub ddl_timeout_millis: u64,
pub connect_timeout_millis: u64,
pub tcp_nodelay: bool,
}
fn default_ddl_timeout_millis() -> u64 {
10_000u64
}
impl Default for MetaClientOptions {
fn default() -> Self {
Self {
metasrv_addrs: vec!["127.0.0.1:3002".to_string()],
timeout_millis: 3_000u64,
ddl_timeout_millis: default_ddl_timeout_millis(),
connect_timeout_millis: 5_000u64,
tcp_nodelay: true,
}

View File

@@ -18,12 +18,14 @@ common-base = { path = "../common/base" }
common-catalog = { path = "../common/catalog" }
common-error = { path = "../common/error" }
common-grpc = { path = "../common/grpc" }
common-grpc-expr = { path = "../common/grpc-expr" }
common-meta = { path = "../common/meta" }
common-procedure = { path = "../common/procedure" }
common-runtime = { path = "../common/runtime" }
common-telemetry = { path = "../common/telemetry" }
common-time = { path = "../common/time" }
dashmap = "5.4"
datatypes = { path = "../datatypes" }
derive_builder = "0.12"
etcd-client.workspace = true
futures.workspace = true
@@ -53,6 +55,5 @@ servers = { path = "../servers" }
[dev-dependencies]
chrono.workspace = true
common-procedure-test = { path = "../common/procedure-test" }
datatypes = { path = "../datatypes" }
tracing = "0.1"
tracing-subscriber = { version = "0.3", features = ["env-filter"] }

View File

@@ -16,12 +16,15 @@ use std::sync::Arc;
use client::client_manager::DatanodeClients;
use common_meta::key::TableMetadataManagerRef;
use common_meta::rpc::ddl::{CreateTableTask, DropTableTask};
use common_meta::rpc::ddl::{AlterTableTask, CreateTableTask, DropTableTask};
use common_meta::rpc::router::TableRoute;
use common_procedure::{watcher, ProcedureId, ProcedureManagerRef, ProcedureWithId};
use snafu::ResultExt;
use table::metadata::RawTableInfo;
use table::requests::AlterTableRequest;
use crate::error::{self, Result};
use crate::procedure::alter_table::AlterTableProcedure;
use crate::procedure::create_table::CreateTableProcedure;
use crate::procedure::drop_table::DropTableProcedure;
use crate::service::mailbox::MailboxRef;
@@ -33,8 +36,8 @@ pub struct DdlManager {
procedure_manager: ProcedureManagerRef,
kv_store: KvStoreRef,
datanode_clients: Arc<DatanodeClients>,
mailbox: MailboxRef,
server_addr: String,
pub(crate) mailbox: MailboxRef,
pub(crate) server_addr: String,
table_metadata_manager: TableMetadataManagerRef,
}
@@ -104,9 +107,45 @@ impl DdlManager {
)
.context(error::RegisterProcedureLoaderSnafu {
type_name: DropTableProcedure::TYPE_NAME,
})?;
let context = self.create_context();
self.procedure_manager
.register_loader(
AlterTableProcedure::TYPE_NAME,
Box::new(move |json| {
let context = context.clone();
AlterTableProcedure::from_json(json, context).map(|p| Box::new(p) as _)
}),
)
.context(error::RegisterProcedureLoaderSnafu {
type_name: AlterTableProcedure::TYPE_NAME,
})
}
pub async fn submit_alter_table_task(
&self,
cluster_id: u64,
alter_table_task: AlterTableTask,
alter_table_request: AlterTableRequest,
table_info: RawTableInfo,
) -> Result<ProcedureId> {
let context = self.create_context();
let procedure = AlterTableProcedure::new(
cluster_id,
alter_table_task,
alter_table_request,
table_info,
context,
);
let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
self.submit_procedure(procedure_with_id).await
}
pub async fn submit_create_table_task(
&self,
cluster_id: u64,

View File

@@ -30,6 +30,35 @@ pub enum Error {
source: JoinError,
},
#[snafu(display("Failed to convert grpc expr, source: {}", source))]
ConvertGrpcExpr {
location: Location,
source: common_grpc_expr::error::Error,
},
#[snafu(display(
"Failed to build table meta for table: {}, source: {}",
table_name,
source
))]
BuildTableMeta {
table_name: String,
source: table::metadata::TableMetaBuilderError,
location: Location,
},
#[snafu(display("Table occurs error, source: {}", source))]
Table {
location: Location,
source: table::error::Error,
},
#[snafu(display("Failed to convert RawTableInfo into TableInfo: {}", source))]
ConvertRawTableInfo {
location: Location,
source: datatypes::Error,
},
#[snafu(display("Failed to execute transaction: {}", msg))]
Txn { location: Location, msg: String },
@@ -315,7 +344,7 @@ pub enum Error {
source: common_procedure::Error,
},
#[snafu(display("Failed to recover procedure, source: {source}"))]
#[snafu(display("Failed to wait procedure done, source: {source}"))]
WaitProcedure {
location: Location,
source: common_procedure::Error,
@@ -477,6 +506,7 @@ impl ErrorExt for Error {
| Error::StartGrpc { .. }
| Error::Combine { .. }
| Error::NoEnoughAvailableDatanode { .. }
| Error::ConvertGrpcExpr { .. }
| Error::Join { .. } => StatusCode::Internal,
Error::EmptyKey { .. }
| Error::MissingRequiredParameter { .. }
@@ -502,8 +532,11 @@ impl ErrorExt for Error {
| Error::UnexpectedInstructionReply { .. }
| Error::Unexpected { .. }
| Error::Txn { .. }
| Error::TableIdChanged { .. } => StatusCode::Unexpected,
| Error::TableIdChanged { .. }
| Error::ConvertRawTableInfo { .. }
| Error::BuildTableMeta { .. } => StatusCode::Unexpected,
Error::TableNotFound { .. } => StatusCode::TableNotFound,
Error::Table { source, .. } => source.status_code(),
Error::RequestDatanode { source, .. } => source.status_code(),
Error::InvalidCatalogValue { source, .. } => source.status_code(),
Error::RecoverProcedure { source, .. }

View File

@@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
pub mod alter_table;
pub mod create_table;
pub mod drop_table;
pub mod region_failover;

View File

@@ -0,0 +1,454 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::vec;
use api::v1::meta::MailboxMessage;
use async_trait::async_trait;
use client::Database;
use common_meta::ident::TableIdent;
use common_meta::instruction::Instruction;
use common_meta::kv_backend::txn::{Compare, CompareOp, Txn, TxnOp};
use common_meta::rpc::ddl::AlterTableTask;
use common_meta::rpc::router::TableRoute;
use common_meta::table_name::TableName;
use common_procedure::error::{FromJsonSnafu, Result as ProcedureResult, ToJsonSnafu};
use common_procedure::{
Context as ProcedureContext, Error as ProcedureError, LockKey, Procedure, Status,
};
use common_telemetry::debug;
use futures::future::join_all;
use serde::{Deserialize, Serialize};
use snafu::{ensure, ResultExt};
use table::engine::TableReference;
use table::metadata::{RawTableInfo, TableInfo};
use table::requests::{AlterKind, AlterTableRequest};
use super::utils::build_table_metadata_key;
use crate::ddl::DdlContext;
use crate::error::{self, Result};
use crate::procedure::utils::handle_request_datanode_error;
use crate::service::mailbox::BroadcastChannel;
use crate::table_routes::fetch_table;
// TODO(weny): removes in following PRs.
#[allow(dead_code)]
pub struct AlterTableProcedure {
context: DdlContext,
data: AlterTableData,
}
// TODO(weny): removes in following PRs.
#[allow(dead_code)]
impl AlterTableProcedure {
pub(crate) const TYPE_NAME: &'static str = "metasrv-procedure::AlterTable";
pub(crate) fn new(
cluster_id: u64,
task: AlterTableTask,
alter_table_request: AlterTableRequest,
table_info: RawTableInfo,
context: DdlContext,
) -> Self {
Self {
context,
data: AlterTableData::new(task, alter_table_request, table_info, cluster_id),
}
}
pub(crate) fn from_json(json: &str, context: DdlContext) -> ProcedureResult<Self> {
let data = serde_json::from_str(json).context(FromJsonSnafu)?;
Ok(AlterTableProcedure { context, data })
}
/// Alters table on datanode.
async fn on_datanode_alter_table(&mut self) -> Result<Status> {
let AlterTableState::DatanodeAlterTable(ref table_route) = self.data.state else {
return error::UnexpectedSnafu {
violated: "expected DatanodeAlterTable",
}
.fail();
};
let table_ref = self.data.table_ref();
let clients = self.context.datanode_clients.clone();
let leaders = table_route.find_leaders();
let mut joins = Vec::with_capacity(leaders.len());
for datanode in leaders {
let client = clients.get_client(&datanode).await;
let client = Database::new(table_ref.catalog, table_ref.schema, client);
let expr = self.data.task.alter_table.clone();
joins.push(common_runtime::spawn_bg(async move {
debug!("Sending {:?} to {:?}", expr, client);
client
.alter(expr)
.await
.map_err(handle_request_datanode_error(datanode))
}));
}
let _ = join_all(joins)
.await
.into_iter()
.map(|e| e.context(error::JoinSnafu).flatten())
.collect::<Result<Vec<_>>>()?;
self.data.state = AlterTableState::Broadcast;
Ok(Status::executing(true))
}
/// Update table metadata for rename table operation.
async fn on_update_metadata_for_rename(
&self,
new_table_name: &str,
new_table_info: TableInfo,
) -> Result<TableRoute> {
let table_ref = self.data.table_ref();
let new_table_ref = TableReference {
catalog: table_ref.catalog,
schema: table_ref.schema,
table: new_table_name,
};
let table_id = self.data.table_info.ident.table_id;
// Check whether the table has already been renamed.
if let Some((mut table_global_value, table_route_value)) =
fetch_table(&self.context.kv_store, table_ref).await?
{
let table_route = table_route_value
.clone()
.try_into()
.context(error::TableRouteConversionSnafu)?;
let (table_global_key, table_route_key) = build_table_metadata_key(table_ref, table_id);
let (new_table_global_key, new_table_route_key) =
build_table_metadata_key(new_table_ref, table_id);
table_global_value.table_info = new_table_info.into();
let txn = Txn::new()
.when(vec![
Compare::with_value(
table_route_key.to_string().into_bytes(),
CompareOp::Equal,
table_route_value.clone().into(),
),
// Compare::with_value(
// table_global_key.to_string().into_bytes(),
// CompareOp::Equal,
// table_global_value
// .clone()
// .as_bytes()
// .context(error::InvalidCatalogValueSnafu)?,
// ),
])
.and_then(vec![
TxnOp::Delete(table_global_key.to_string().into_bytes()),
TxnOp::Delete(table_route_key.to_string().into_bytes()),
TxnOp::Put(
new_table_global_key.to_string().into_bytes(),
table_global_value
.clone()
.as_bytes()
.context(error::InvalidCatalogValueSnafu)?,
),
TxnOp::Put(
new_table_route_key.to_string().into_bytes(),
table_route_value.into(),
),
]);
let resp = self.context.kv_store.txn(txn).await?;
ensure!(
resp.succeeded,
error::TxnSnafu {
msg: "table metadata changed"
}
);
return Ok(table_route);
} else if let Some((table, route)) =
fetch_table(&self.context.kv_store, new_table_ref).await?
{
let table_route = route.try_into().context(error::TableRouteConversionSnafu)?;
ensure!(
table.table_info == new_table_info.into(),
error::UnexpectedSnafu {
violated: "table metadata changed"
}
);
return Ok(table_route);
}
error::TableNotFoundSnafu {
name: table_ref.to_string(),
}
.fail()
}
fn build_new_table_info(&self) -> Result<TableInfo> {
// Builds new_meta
let table_info = TableInfo::try_from(self.data.table_info.clone())
.context(error::ConvertRawTableInfoSnafu)?;
let table_ref = self.data.table_ref();
let request = &self.data.alter_table_request;
let new_meta = table_info
.meta
.builder_with_alter_kind(table_ref.table, &request.alter_kind)
.context(error::TableSnafu)?
.build()
.with_context(|_| error::BuildTableMetaSnafu {
table_name: table_ref.table.clone(),
})?;
let mut new_info = table_info.clone();
new_info.ident.version = table_info.ident.version + 1;
new_info.meta = new_meta;
Ok(new_info)
}
/// Update table metadata.
async fn on_update_metadata(&mut self) -> Result<Status> {
let request = &self.data.alter_table_request;
let table_id = self.data.table_info.ident.table_id;
let table_ref = self.data.table_ref();
let new_info = self.build_new_table_info()?;
debug!(
"starting update table: {} metadata, new table info {:?}",
table_ref.to_string(),
new_info
);
if let AlterKind::RenameTable { new_table_name } = &request.alter_kind {
let table_route = self
.on_update_metadata_for_rename(new_table_name, new_info)
.await?;
self.data.state = AlterTableState::DatanodeAlterTable(table_route);
return Ok(Status::executing(true));
}
if let Some((mut table_global_value, table_route_value)) =
fetch_table(&self.context.kv_store, table_ref).await?
{
let table_route = table_route_value
.clone()
.try_into()
.context(error::TableRouteConversionSnafu)?;
let new_raw_info: RawTableInfo = new_info.into();
// If the metadata already updated.
if table_global_value.table_info == new_raw_info {
debug!("table: {} metadata already updated", table_ref.to_string());
self.data.state = AlterTableState::DatanodeAlterTable(table_route);
return Ok(Status::executing(true));
}
let (table_global_key, table_route_key) = build_table_metadata_key(table_ref, table_id);
let txn = Txn::new().when(vec![
Compare::with_value(
table_route_key.to_string().into_bytes(),
CompareOp::Equal,
table_route_value.clone().into(),
),
// TODO(weny): due to unordered map, we cannot compare values directly.
// Compare::with_value(
// table_global_key.to_string().into_bytes(),
// CompareOp::Equal,
// table_global_value
// .clone()
// .as_bytes()
// .context(error::InvalidCatalogValueSnafu)?,
// ),
]);
table_global_value.table_info = new_raw_info;
let txn = txn.and_then(vec![TxnOp::Put(
table_global_key.to_string().into_bytes(),
table_global_value
.clone()
.as_bytes()
.context(error::InvalidCatalogValueSnafu)?,
)]);
let resp = self.context.kv_store.txn(txn).await?;
ensure!(
resp.succeeded,
error::TxnSnafu {
msg: "table metadata changed"
}
);
debug!("table: {} metadata updated", table_ref.to_string());
self.data.state = AlterTableState::DatanodeAlterTable(table_route);
Ok(Status::executing(true))
} else {
error::TableNotFoundSnafu {
name: table_ref.to_string(),
}
.fail()
}
}
/// Broadcasts the invalidating table cache instructions.
async fn on_broadcast(&mut self) -> Result<Status> {
let table_name = self.data.table_name();
let table_ident = TableIdent {
catalog: table_name.catalog_name,
schema: table_name.schema_name,
table: table_name.table_name,
table_id: self.data.table_info.ident.table_id,
engine: self.data.table_info.meta.engine.to_string(),
};
let instruction = Instruction::InvalidateTableCache(table_ident);
let msg = &MailboxMessage::json_message(
"Invalidate table cache by alter table procedure",
&format!("Metasrv@{}", self.context.server_addr),
"Frontend broadcast",
common_time::util::current_time_millis(),
&instruction,
)
.with_context(|_| error::SerializeToJsonSnafu {
input: instruction.to_string(),
})?;
self.context
.mailbox
.broadcast(&BroadcastChannel::Frontend, msg)
.await?;
Ok(Status::Done)
}
fn lock_key_inner(&self) -> Vec<String> {
let table_ref = self.data.table_ref();
let table_key = common_catalog::format_full_table_name(
table_ref.catalog,
table_ref.schema,
table_ref.table,
);
let mut lock_key = vec![table_key];
if let AlterKind::RenameTable { new_table_name } = &self.data.alter_table_request.alter_kind
{
lock_key.push(common_catalog::format_full_table_name(
table_ref.catalog,
table_ref.schema,
new_table_name,
))
}
lock_key
}
}
#[async_trait]
impl Procedure for AlterTableProcedure {
fn type_name(&self) -> &str {
Self::TYPE_NAME
}
async fn execute(&mut self, _ctx: &ProcedureContext) -> ProcedureResult<Status> {
let error_handler = |e| {
if matches!(e, error::Error::RetryLater { .. }) {
ProcedureError::retry_later(e)
} else {
ProcedureError::external(e)
}
};
match self.data.state {
AlterTableState::UpdateMetadata => self.on_update_metadata().await,
AlterTableState::DatanodeAlterTable(_) => self.on_datanode_alter_table().await,
AlterTableState::Broadcast => self.on_broadcast().await,
}
.map_err(error_handler)
}
fn dump(&self) -> ProcedureResult<String> {
serde_json::to_string(&self.data).context(ToJsonSnafu)
}
fn lock_key(&self) -> LockKey {
let key = self.lock_key_inner();
LockKey::new(key)
}
}
#[derive(Debug, Serialize, Deserialize)]
enum AlterTableState {
/// Updates table metadata.
UpdateMetadata,
/// Datanode alters the table.
DatanodeAlterTable(TableRoute),
/// Broadcasts the invalidating table cache instruction.
Broadcast,
}
#[derive(Debug, Serialize, Deserialize)]
pub struct AlterTableData {
state: AlterTableState,
task: AlterTableTask,
alter_table_request: AlterTableRequest,
table_route: Option<TableRoute>,
table_info: RawTableInfo,
cluster_id: u64,
}
impl AlterTableData {
pub fn new(
task: AlterTableTask,
alter_table_request: AlterTableRequest,
table_info: RawTableInfo,
cluster_id: u64,
) -> Self {
Self {
state: AlterTableState::UpdateMetadata,
task,
alter_table_request,
table_route: None,
table_info,
cluster_id,
}
}
fn table_ref(&self) -> TableReference {
self.task.table_ref()
}
fn table_name(&self) -> TableName {
self.task.table_name()
}
}

View File

@@ -17,8 +17,10 @@ use api::v1::meta::{
Table, TableRoute,
};
use api::v1::TableId;
use catalog::helper::TableGlobalKey;
use common_grpc_expr::alter_expr_to_request;
use common_meta::key::TableRouteKey;
use common_meta::rpc::ddl::{CreateTableTask, DdlTask, DropTableTask};
use common_meta::rpc::ddl::{AlterTableTask, CreateTableTask, DdlTask, DropTableTask};
use common_meta::rpc::router;
use common_meta::table_name::TableName;
use common_telemetry::{info, warn};
@@ -32,7 +34,7 @@ use crate::ddl::DdlManagerRef;
use crate::error::{self, Result};
use crate::metasrv::{MetaSrv, SelectorContext, SelectorRef};
use crate::sequence::SequenceRef;
use crate::table_routes::get_table_route_value;
use crate::table_routes::{get_table_global_value, get_table_route_value};
#[async_trait::async_trait]
impl ddl_task_server::DdlTask for MetaSrv {
@@ -79,6 +81,15 @@ impl ddl_task_server::DdlTask for MetaSrv {
)
.await?
}
DdlTask::AlterTable(alter_table_task) => {
handle_alter_table_task(
header.cluster_id,
alter_table_task,
self.kv_store().clone(),
self.ddl_manager().clone(),
)
.await?
}
};
Ok(Response::new(resp))
@@ -229,7 +240,61 @@ async fn handle_drop_table_task(
.submit_drop_table_task(cluster_id, drop_table_task, table_route)
.await?;
info!("Table: {table_id} created via procedure_id {id:?}");
Ok(SubmitDdlTaskResponse {
key: id.to_string().into(),
..Default::default()
})
}
async fn handle_alter_table_task(
cluster_id: u64,
mut alter_table_task: AlterTableTask,
kv_store: KvStoreRef,
ddl_manager: DdlManagerRef,
) -> Result<SubmitDdlTaskResponse> {
let table_id = alter_table_task
.alter_table
.table_id
.as_ref()
.context(error::UnexpectedSnafu {
violated: "expected table id ",
})?
.id;
let mut alter_table_request =
alter_expr_to_request(table_id, alter_table_task.alter_table.clone())
.context(error::ConvertGrpcExprSnafu)?;
let table_ref = alter_table_task.table_ref();
let table_global_key = TableGlobalKey {
catalog_name: table_ref.catalog.to_string(),
schema_name: table_ref.schema.to_string(),
table_name: table_ref.table.to_string(),
};
let table_global_value = get_table_global_value(&kv_store, &table_global_key)
.await?
.with_context(|| error::TableNotFoundSnafu {
name: table_ref.to_string(),
})?;
let table_info = table_global_value.table_info;
// Sets alter_table's table_version
alter_table_task.alter_table.table_version = table_info.ident.version;
alter_table_request.table_version = Some(table_info.ident.version);
let id = ddl_manager
.submit_alter_table_task(
cluster_id,
alter_table_task,
alter_table_request,
table_info,
)
.await?;
info!("Table: {table_id} is altering via procedure_id {id:?}");
Ok(SubmitDdlTaskResponse {
key: id.to_string().into(),

View File

@@ -185,13 +185,13 @@ pub(crate) async fn fetch_tables(
// there won't be many keys, in fact, there is usually just one.
for tgk in keys {
let tgv = get_table_global_value(kv_store, &tgk).await?;
if tgv.is_none() {
let Some(tgv) = tgv else {
warn!("Table global value is absent: {}", tgk);
continue;
}
let tgv = tgv.unwrap();
};
let trk = table_route_key(tgv.table_id(), &tgk);
let trv = get_table_route_value(kv_store, &trk).await?;
tables.push((tgv, trv));

View File

@@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::cmp::Ordering;
use std::sync::Arc;
use async_trait::async_trait;
@@ -20,7 +21,7 @@ use common_procedure::{Context, LockKey, Procedure, ProcedureManager, Result, St
use common_telemetry::logging;
use common_telemetry::metric::Timer;
use serde::{Deserialize, Serialize};
use snafu::{ensure, OptionExt, ResultExt};
use snafu::{OptionExt, ResultExt};
use store_api::manifest::Manifest;
use store_api::storage::{AlterOperation, StorageEngine};
use table::engine::TableReference;
@@ -29,7 +30,7 @@ use table::requests::{AlterKind, AlterTableRequest};
use table::{Table, TableRef};
use crate::engine::MitoEngineInner;
use crate::error::{TableNotFoundSnafu, UpdateTableManifestSnafu, VersionChangedSnafu};
use crate::error::{StaleVersionSnafu, TableNotFoundSnafu, UpdateTableManifestSnafu};
use crate::manifest::action::{TableChange, TableMetaAction, TableMetaActionList};
use crate::metrics;
use crate::table::MitoTable;
@@ -110,7 +111,7 @@ impl<S: StorageEngine> AlterMitoTable<S> {
table_name: table_ref.to_string(),
})?;
let info = table.table_info();
data.table_version = info.ident.version;
data.table_version = data.request.table_version.unwrap_or(info.ident.version);
Ok(AlterMitoTable {
data,
@@ -161,19 +162,23 @@ impl<S: StorageEngine> AlterMitoTable<S> {
/// Prepare table info.
fn on_prepare(&mut self) -> Result<Status> {
let current_info = self.table.table_info();
ensure!(
current_info.ident.version == self.data.table_version,
VersionChangedSnafu {
match current_info.ident.version.cmp(&self.data.table_version) {
Ordering::Greater => Ok(Status::Done),
Ordering::Less => Err(StaleVersionSnafu {
expect: self.data.table_version,
actual: current_info.ident.version,
current: current_info.ident.version,
}
);
.build()
.into()),
Ordering::Equal => {
// We don't check the table name in the table engine as it is the catalog
// manager's duty to ensure the table name is unused.
self.data.state = AlterTableState::EngineAlterTable;
// We don't check the table name in the table engine as it is the catalog
// manager's duty to ensure the table name is unused.
self.data.state = AlterTableState::EngineAlterTable;
Ok(Status::executing(true))
Ok(Status::executing(true))
}
}
}
/// Engine alters the table.

View File

@@ -548,6 +548,7 @@ fn new_add_columns_req(
},
],
},
table_version: None,
}
}
@@ -577,6 +578,7 @@ pub(crate) fn new_add_columns_req_with_location(
},
],
},
table_version: None,
}
}
@@ -672,6 +674,7 @@ async fn test_alter_table_remove_column() {
alter_kind: AlterKind::DropColumns {
names: vec![String::from("memory"), String::from("my_field")],
},
table_version: None,
};
let table = table_engine
.alter_table(&EngineContext::default(), req)
@@ -719,6 +722,7 @@ async fn test_alter_rename_table() {
alter_kind: AlterKind::RenameTable {
new_table_name: new_table_name.to_string(),
},
table_version: None,
};
let table = table_engine.alter_table(&ctx, req).await.unwrap();

View File

@@ -164,10 +164,11 @@ pub enum Error {
#[snafu(display("Invalid schema, source: {}", source))]
InvalidRawSchema { source: datatypes::error::Error },
#[snafu(display("Table version changed, expect: {}, actual: {}", expect, actual))]
VersionChanged {
#[snafu(display("Stale version found, expect: {}, current: {}", expect, current))]
StaleVersion {
expect: TableVersion,
actual: TableVersion,
current: TableVersion,
location: Location,
},
}
@@ -189,7 +190,7 @@ impl ErrorExt for Error {
| MissingTimestampIndex { .. }
| TableNotFound { .. }
| InvalidRawSchema { .. }
| VersionChanged { .. } => StatusCode::InvalidArguments,
| StaleVersion { .. } => StatusCode::InvalidArguments,
TableExists { .. } => StatusCode::TableAlreadyExists,

View File

@@ -129,6 +129,7 @@ pub fn new_alter_request(alter_kind: AlterKind) -> AlterTableRequest {
table_name: TABLE_NAME.to_string(),
table_id: TABLE_ID,
alter_kind,
table_version: None,
}
}

View File

@@ -317,6 +317,7 @@ mod tests {
alter_kind: AlterKind::RenameTable {
new_table_name: new_table_name.to_string(),
},
table_version: None,
};
let TestEnv {

View File

@@ -28,7 +28,7 @@ use crate::TableRef;
pub mod manager;
/// Represents a resolved path to a table of the form “catalog.schema.table”
#[derive(Debug, PartialEq, Eq, Hash)]
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub struct TableReference<'a> {
pub catalog: &'a str,
pub schema: &'a str,

View File

@@ -29,7 +29,7 @@ use store_api::storage::RegionNumber;
use crate::engine::TableReference;
use crate::error;
use crate::error::ParseTableOptionSnafu;
use crate::metadata::TableId;
use crate::metadata::{TableId, TableVersion};
pub const IMMUTABLE_TABLE_META_KEY: &str = "__private.immutable_table_meta";
pub const IMMUTABLE_TABLE_LOCATION_KEY: &str = "location";
@@ -164,6 +164,8 @@ pub struct AlterTableRequest {
pub table_name: String,
pub table_id: TableId,
pub alter_kind: AlterKind,
// None in standalone.
pub table_version: Option<TableVersion>,
}
impl AlterTableRequest {