From 735c6390ca0ff2266adf0046fc9e41c541ee576a Mon Sep 17 00:00:00 2001 From: Weny Xu Date: Thu, 13 Jul 2023 11:41:46 +0900 Subject: [PATCH] feat: implement alter table procedure (#1878) * feat: implement alter table procedure * fix: fix uncaught error * refactor: move fetch_table/s to table_routes.rs * refactor: refactor error handling * chore: apply suggestions from CR * feat: switch to using alter table procedure * feat: add table_version * chore: apply suggestions from CR * feat: introduce ddl_channel_manager * chore: update greptime-proto --- Cargo.lock | 3 +- Cargo.toml | 2 +- config/frontend.example.toml | 2 + src/cmd/src/datanode.rs | 3 + src/common/grpc-expr/src/alter.rs | 54 +-- src/common/meta/src/rpc/ddl.rs | 84 +++- src/common/meta/src/rpc/router.rs | 14 +- src/datanode/src/sql/alter.rs | 1 + src/file-table-engine/src/engine/tests.rs | 1 + src/frontend/src/error.rs | 9 +- src/frontend/src/instance.rs | 8 + src/frontend/src/instance/distributed.rs | 73 ++-- src/frontend/src/table.rs | 1 + src/meta-client/src/client.rs | 9 + src/meta-client/src/lib.rs | 7 + src/meta-srv/Cargo.toml | 3 +- src/meta-srv/src/ddl.rs | 45 ++- src/meta-srv/src/error.rs | 37 +- src/meta-srv/src/procedure.rs | 1 + src/meta-srv/src/procedure/alter_table.rs | 454 ++++++++++++++++++++++ src/meta-srv/src/service/ddl.rs | 71 +++- src/meta-srv/src/table_routes.rs | 6 +- src/mito/src/engine/procedure/alter.rs | 31 +- src/mito/src/engine/tests.rs | 4 + src/mito/src/error.rs | 9 +- src/mito/src/table/test_util.rs | 1 + src/table-procedure/src/alter.rs | 1 + src/table/src/engine.rs | 2 +- src/table/src/requests.rs | 4 +- 29 files changed, 827 insertions(+), 113 deletions(-) create mode 100644 src/meta-srv/src/procedure/alter_table.rs diff --git a/Cargo.lock b/Cargo.lock index 592a083000..cbeea590d3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4134,7 +4134,7 @@ checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b" [[package]] name = "greptime-proto" version = "0.1.0" -source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=0f30b7fcbf950f0596f1159373947a21ff737216#0f30b7fcbf950f0596f1159373947a21ff737216" +source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=9b26cca2132df83916692cc6fc092a26a1a76f0b#9b26cca2132df83916692cc6fc092a26a1a76f0b" dependencies = [ "prost", "serde", @@ -5235,6 +5235,7 @@ dependencies = [ "common-catalog", "common-error", "common-grpc", + "common-grpc-expr", "common-meta", "common-procedure", "common-procedure-test", diff --git a/Cargo.toml b/Cargo.toml index 40d301d220..de889a5179 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -74,7 +74,7 @@ datafusion-substrait = { git = "https://github.com/waynexia/arrow-datafusion.git etcd-client = "0.11" futures = "0.3" futures-util = "0.3" -greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "0f30b7fcbf950f0596f1159373947a21ff737216" } +greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "9b26cca2132df83916692cc6fc092a26a1a76f0b" } itertools = "0.10" lazy_static = "1.4" parquet = "40.0" diff --git a/config/frontend.example.toml b/config/frontend.example.toml index 62fe78edef..bcab85dd9a 100644 --- a/config/frontend.example.toml +++ b/config/frontend.example.toml @@ -59,6 +59,8 @@ addr = "127.0.0.1:4004" [meta_client_options] metasrv_addrs = ["127.0.0.1:3002"] timeout_millis = 3000 +# DDL timeouts options. +ddl_timeout_millis = 10000 connect_timeout_millis = 5000 tcp_nodelay = true diff --git a/src/cmd/src/datanode.rs b/src/cmd/src/datanode.rs index 3c58822996..279a8d0086 100644 --- a/src/cmd/src/datanode.rs +++ b/src/cmd/src/datanode.rs @@ -206,6 +206,7 @@ mod tests { metasrv_addrs = ["127.0.0.1:3002"] timeout_millis = 3000 connect_timeout_millis = 5000 + ddl_timeout_millis= 10000 tcp_nodelay = true [wal] @@ -259,10 +260,12 @@ mod tests { timeout_millis, connect_timeout_millis, tcp_nodelay, + ddl_timeout_millis, } = options.meta_client_options.unwrap(); assert_eq!(vec!["127.0.0.1:3002".to_string()], metasrv_addr); assert_eq!(5000, connect_timeout_millis); + assert_eq!(10000, ddl_timeout_millis); assert_eq!(3000, timeout_millis); assert!(tcp_nodelay); diff --git a/src/common/grpc-expr/src/alter.rs b/src/common/grpc-expr/src/alter.rs index a98c54d756..2d467d790c 100644 --- a/src/common/grpc-expr/src/alter.rs +++ b/src/common/grpc-expr/src/alter.rs @@ -38,7 +38,7 @@ pub fn alter_expr_to_request(table_id: TableId, expr: AlterExpr) -> Result { let add_column_requests = add_columns .add_columns @@ -61,45 +61,27 @@ pub fn alter_expr_to_request(table_id: TableId, expr: AlterExpr) -> Result>>()?; - let alter_kind = AlterKind::AddColumns { + AlterKind::AddColumns { columns: add_column_requests, - }; - - let request = AlterTableRequest { - catalog_name, - schema_name, - table_name: expr.table_name, - table_id, - alter_kind, - }; - Ok(request) - } - Kind::DropColumns(DropColumns { drop_columns }) => { - let alter_kind = AlterKind::DropColumns { - names: drop_columns.into_iter().map(|c| c.name).collect(), - }; - - let request = AlterTableRequest { - catalog_name, - schema_name, - table_name: expr.table_name, - table_id, - alter_kind, - }; - Ok(request) + } } + Kind::DropColumns(DropColumns { drop_columns }) => AlterKind::DropColumns { + names: drop_columns.into_iter().map(|c| c.name).collect(), + }, Kind::RenameTable(RenameTable { new_table_name }) => { - let alter_kind = AlterKind::RenameTable { new_table_name }; - let request = AlterTableRequest { - catalog_name, - schema_name, - table_name: expr.table_name, - table_id, - alter_kind, - }; - Ok(request) + AlterKind::RenameTable { new_table_name } } - } + }; + + let request = AlterTableRequest { + catalog_name, + schema_name, + table_name: expr.table_name, + table_id, + alter_kind, + table_version: Some(expr.table_version), + }; + Ok(request) } pub fn create_table_schema(expr: &CreateTableExpr, require_time_index: bool) -> Result { diff --git a/src/common/meta/src/rpc/ddl.rs b/src/common/meta/src/rpc/ddl.rs index dce9c2f24d..1e8548aa34 100644 --- a/src/common/meta/src/rpc/ddl.rs +++ b/src/common/meta/src/rpc/ddl.rs @@ -16,11 +16,11 @@ use std::result; use api::v1::meta::submit_ddl_task_request::Task; use api::v1::meta::{ - CreateTableTask as PbCreateTableTask, DropTableTask as PbDropTableTask, Partition, - SubmitDdlTaskRequest as PbSubmitDdlTaskRequest, + AlterTableTask as PbAlterTableTask, CreateTableTask as PbCreateTableTask, + DropTableTask as PbDropTableTask, Partition, SubmitDdlTaskRequest as PbSubmitDdlTaskRequest, SubmitDdlTaskResponse as PbSubmitDdlTaskResponse, }; -use api::v1::{CreateTableExpr, DropTableExpr}; +use api::v1::{AlterExpr, CreateTableExpr, DropTableExpr}; use prost::Message; use serde::{Deserialize, Serialize}; use snafu::{OptionExt, ResultExt}; @@ -34,6 +34,7 @@ use crate::table_name::TableName; pub enum DdlTask { CreateTable(CreateTableTask), DropTable(DropTableTask), + AlterTable(AlterTableTask), } impl DdlTask { @@ -58,6 +59,10 @@ impl DdlTask { table_id, }) } + + pub fn new_alter_table(alter_table: AlterExpr) -> Self { + DdlTask::AlterTable(AlterTableTask { alter_table }) + } } impl TryFrom for DdlTask { @@ -68,7 +73,7 @@ impl TryFrom for DdlTask { Ok(DdlTask::CreateTable(create_table.try_into()?)) } Task::DropTableTask(drop_table) => Ok(DdlTask::DropTable(drop_table.try_into()?)), - _ => todo!(), + Task::AlterTableTask(alter_table) => Ok(DdlTask::AlterTable(alter_table.try_into()?)), } } } @@ -87,7 +92,6 @@ impl TryFrom for PbSubmitDdlTaskRequest { create_table: Some(task.create_table), partitions: task.partitions, }), - DdlTask::DropTable(task) => Task::DropTableTask(PbDropTableTask { drop_table: Some(DropTableExpr { catalog_name: task.catalog, @@ -96,7 +100,11 @@ impl TryFrom for PbSubmitDdlTaskRequest { table_id: Some(api::v1::TableId { id: task.table_id }), }), }), + DdlTask::AlterTable(task) => Task::AlterTableTask(PbAlterTableTask { + alter_table: Some(task.alter_table), + }), }; + Ok(Self { header: None, task: Some(task), @@ -260,6 +268,72 @@ impl<'de> Deserialize<'de> for CreateTableTask { } } +#[derive(Debug, PartialEq)] +pub struct AlterTableTask { + pub alter_table: AlterExpr, +} + +impl AlterTableTask { + pub fn table_ref(&self) -> TableReference { + TableReference { + catalog: &self.alter_table.catalog_name, + schema: &self.alter_table.schema_name, + table: &self.alter_table.table_name, + } + } + + pub fn table_name(&self) -> TableName { + let table = &self.alter_table; + + TableName { + catalog_name: table.catalog_name.to_string(), + schema_name: table.schema_name.to_string(), + table_name: table.table_name.to_string(), + } + } +} + +impl TryFrom for AlterTableTask { + type Error = error::Error; + + fn try_from(pb: PbAlterTableTask) -> Result { + let alter_table = pb.alter_table.context(error::InvalidProtoMsgSnafu { + err_msg: "expected alter_table", + })?; + + Ok(AlterTableTask { alter_table }) + } +} + +impl Serialize for AlterTableTask { + fn serialize(&self, serializer: S) -> result::Result + where + S: serde::Serializer, + { + let pb = PbAlterTableTask { + alter_table: Some(self.alter_table.clone()), + }; + let buf = pb.encode_to_vec(); + serializer.serialize_bytes(&buf) + } +} + +impl<'de> Deserialize<'de> for AlterTableTask { + fn deserialize(deserializer: D) -> result::Result + where + D: serde::Deserializer<'de>, + { + let buf = Vec::::deserialize(deserializer)?; + let expr: PbAlterTableTask = PbAlterTableTask::decode(&*buf) + .map_err(|err| serde::de::Error::custom(err.to_string()))?; + + let expr = AlterTableTask::try_from(expr) + .map_err(|err| serde::de::Error::custom(err.to_string()))?; + + Ok(expr) + } +} + #[cfg(test)] mod tests { use std::sync::Arc; diff --git a/src/common/meta/src/rpc/router.rs b/src/common/meta/src/rpc/router.rs index a528e605b4..e2b44108fc 100644 --- a/src/common/meta/src/rpc/router.rs +++ b/src/common/meta/src/rpc/router.rs @@ -18,7 +18,7 @@ use api::v1::meta::{ CreateRequest as PbCreateRequest, DeleteRequest as PbDeleteRequest, Partition as PbPartition, Peer as PbPeer, Region as PbRegion, RegionRoute as PbRegionRoute, RouteRequest as PbRouteRequest, RouteResponse as PbRouteResponse, Table as PbTable, - TableRoute as PbTableRoute, + TableRoute as PbTableRoute, TableRouteValue as PbTableRouteValue, }; use serde::{Deserialize, Serialize, Serializer}; use snafu::{OptionExt, ResultExt}; @@ -286,6 +286,18 @@ impl TableRoute { } } +impl TryFrom for TableRoute { + type Error = error::Error; + fn try_from(pb: PbTableRouteValue) -> Result { + TableRoute::try_from_raw( + &pb.peers, + pb.table_route.context(error::InvalidProtoMsgSnafu { + err_msg: "expected table_route", + })?, + ) + } +} + #[derive(Debug, Clone, Deserialize, Serialize, PartialEq)] pub struct Table { pub id: u64, diff --git a/src/datanode/src/sql/alter.rs b/src/datanode/src/sql/alter.rs index c786c7f321..c6db4e6eeb 100644 --- a/src/datanode/src/sql/alter.rs +++ b/src/datanode/src/sql/alter.rs @@ -95,6 +95,7 @@ impl SqlHandler { table_name: table_ref.table.to_string(), table_id, alter_kind, + table_version: None, }) } } diff --git a/src/file-table-engine/src/engine/tests.rs b/src/file-table-engine/src/engine/tests.rs index 3ad89fcca8..e32b69a337 100644 --- a/src/file-table-engine/src/engine/tests.rs +++ b/src/file-table-engine/src/engine/tests.rs @@ -125,6 +125,7 @@ async fn test_alter_table() { alter_kind: AlterKind::RenameTable { new_table_name: "foo".to_string(), }, + table_version: None, }; let unsupported = table_engine diff --git a/src/frontend/src/error.rs b/src/frontend/src/error.rs index 97ea18aa0f..19ed3c265f 100644 --- a/src/frontend/src/error.rs +++ b/src/frontend/src/error.rs @@ -32,12 +32,6 @@ pub enum Error { location: Location, }, - #[snafu(display("Execute the operation timeout, source: {}", source))] - Timeout { - location: Location, - source: tokio::time::error::Elapsed, - }, - #[snafu(display("Failed to handle heartbeat response, source: {}", source))] HandleHeartbeatResponse { location: Location, @@ -646,8 +640,7 @@ impl ErrorExt for Error { | Error::FindRegionRoute { .. } | Error::BuildDfLogicalPlan { .. } | Error::BuildTableMeta { .. } - | Error::VectorToGrpcColumn { .. } - | Error::Timeout { .. } => StatusCode::Internal, + | Error::VectorToGrpcColumn { .. } => StatusCode::Internal, Error::IncompleteGrpcResult { .. } | Error::ContextValueNotFound { .. } diff --git a/src/frontend/src/instance.rs b/src/frontend/src/instance.rs index c0a63f3f6e..2abb5dc8b6 100644 --- a/src/frontend/src/instance.rs +++ b/src/frontend/src/instance.rs @@ -235,15 +235,23 @@ impl Instance { )) .tcp_nodelay(meta_client_options.tcp_nodelay); + let ddl_channel_config = channel_config.clone().timeout(Duration::from_millis( + meta_client_options.ddl_timeout_millis, + )); + let channel_manager = ChannelManager::with_config(channel_config); channel_manager.start_channel_recycle(); + let ddl_channel_manager = ChannelManager::with_config(ddl_channel_config); + ddl_channel_manager.start_channel_recycle(); + let mut meta_client = MetaClientBuilder::new(0, 0, Role::Frontend) .enable_router() .enable_store() .enable_heartbeat() .enable_ddl() .channel_manager(channel_manager) + .ddl_channel_manager(ddl_channel_manager) .build(); meta_client .start(&meta_client_options.metasrv_addrs) diff --git a/src/frontend/src/instance/distributed.rs b/src/frontend/src/instance/distributed.rs index 163f6655b4..21c4d00527 100644 --- a/src/frontend/src/instance/distributed.rs +++ b/src/frontend/src/instance/distributed.rs @@ -16,7 +16,6 @@ pub(crate) mod inserter; use std::collections::HashMap; use std::sync::Arc; -use std::time::Duration; use api::helper::ColumnDataTypeWrapper; use api::v1::ddl_request::{Expr as DdlExpr, Expr}; @@ -60,11 +59,9 @@ use sql::statements::statement::Statement; use sql::statements::{self, sql_value_to_value}; use store_api::storage::RegionNumber; use table::engine::TableReference; -use table::metadata::{RawTableInfo, RawTableMeta, TableId, TableIdent, TableType}; -use table::requests::TableOptions; -use table::table::AlterContext; +use table::metadata::{RawTableInfo, RawTableMeta, TableId, TableIdent, TableInfo, TableType}; +use table::requests::{AlterTableRequest, TableOptions}; use table::TableRef; -use tokio::time::timeout; use crate::catalog::FrontendCatalogManager; use crate::error::{ @@ -458,7 +455,29 @@ impl DistInstance { Ok(Output::AffectedRows(1)) } - async fn handle_alter_table(&self, expr: AlterExpr) -> Result { + fn verify_alter( + &self, + table_id: TableId, + table_info: Arc, + expr: AlterExpr, + ) -> Result<()> { + let request: table::requests::AlterTableRequest = + common_grpc_expr::alter_expr_to_request(table_id, expr) + .context(AlterExprToRequestSnafu)?; + + let AlterTableRequest { table_name, .. } = &request; + + let _ = table_info + .meta + .builder_with_alter_kind(table_name, &request.alter_kind) + .context(error::TableSnafu)? + .build() + .context(error::BuildTableMetaSnafu { table_name })?; + + Ok(()) + } + + async fn handle_alter_table(&self, mut expr: AlterExpr) -> Result { let catalog_name = if expr.catalog_name.is_empty() { DEFAULT_CATALOG_NAME } else { @@ -482,17 +501,19 @@ impl DistInstance { table_name: format_full_table_name(catalog_name, schema_name, table_name), })?; - let request = common_grpc_expr::alter_expr_to_request( - table.table_info().ident.table_id, - expr.clone(), - ) - .context(AlterExprToRequestSnafu)?; + let table_id = table.table_info().ident.table_id; + expr.table_id = Some(api::v1::TableId { id: table_id }); - let mut context = AlterContext::with_capacity(1); + self.verify_alter(table_id, table.table_info(), expr.clone())?; - let _ = context.insert(expr); + let req = SubmitDdlTaskRequest { + task: DdlTask::new_alter_table(expr.clone()), + }; - table.alter(context, &request).await.context(TableSnafu)?; + self.meta_client + .submit_ddl_task(req) + .await + .context(error::RequestMetaSnafu)?; Ok(Output::AffectedRows(0)) } @@ -510,14 +531,10 @@ impl DistInstance { task: DdlTask::new_create_table(create_table.clone(), partitions, table_info), }; - timeout( - // TODO(weny): makes timeout configurable. - Duration::from_secs(10), - self.meta_client.submit_ddl_task(request), - ) - .await - .context(error::TimeoutSnafu)? - .context(error::RequestMetaSnafu) + self.meta_client + .submit_ddl_task(request) + .await + .context(error::RequestMetaSnafu) } async fn drop_table_procedure( @@ -534,14 +551,10 @@ impl DistInstance { ), }; - timeout( - // TODO(weny): makes timeout configurable. - Duration::from_secs(10), - self.meta_client.submit_ddl_task(request), - ) - .await - .context(error::TimeoutSnafu)? - .context(error::RequestMetaSnafu) + self.meta_client + .submit_ddl_task(request) + .await + .context(error::RequestMetaSnafu) } async fn handle_dist_insert( diff --git a/src/frontend/src/table.rs b/src/frontend/src/table.rs index 0e1fe35bbe..67c6279808 100644 --- a/src/frontend/src/table.rs +++ b/src/frontend/src/table.rs @@ -349,6 +349,7 @@ impl DistTable { table_name, alter_kind, table_id: _table_id, + .. } = request; let alter_expr = context diff --git a/src/meta-client/src/client.rs b/src/meta-client/src/client.rs index 2768336cff..234288def4 100644 --- a/src/meta-client/src/client.rs +++ b/src/meta-client/src/client.rs @@ -54,6 +54,7 @@ pub struct MetaClientBuilder { enable_lock: bool, enable_ddl: bool, channel_manager: Option, + ddl_channel_manager: Option, } impl MetaClientBuilder { @@ -107,6 +108,13 @@ impl MetaClientBuilder { } } + pub fn ddl_channel_manager(self, channel_manager: ChannelManager) -> Self { + Self { + ddl_channel_manager: Some(channel_manager), + ..self + } + } + pub fn build(self) -> MetaClient { let mut client = if let Some(mgr) = self.channel_manager { MetaClient::with_channel_manager(self.id, mgr) @@ -133,6 +141,7 @@ impl MetaClientBuilder { client.lock = Some(LockClient::new(self.id, self.role, mgr.clone())); } if self.enable_ddl { + let mgr = self.ddl_channel_manager.unwrap_or(mgr); client.ddl = Some(DdlClient::new(self.id, self.role, mgr)); } diff --git a/src/meta-client/src/lib.rs b/src/meta-client/src/lib.rs index 8260a737e0..3489bccbf7 100644 --- a/src/meta-client/src/lib.rs +++ b/src/meta-client/src/lib.rs @@ -22,15 +22,22 @@ pub mod error; pub struct MetaClientOptions { pub metasrv_addrs: Vec, pub timeout_millis: u64, + #[serde(default = "default_ddl_timeout_millis")] + pub ddl_timeout_millis: u64, pub connect_timeout_millis: u64, pub tcp_nodelay: bool, } +fn default_ddl_timeout_millis() -> u64 { + 10_000u64 +} + impl Default for MetaClientOptions { fn default() -> Self { Self { metasrv_addrs: vec!["127.0.0.1:3002".to_string()], timeout_millis: 3_000u64, + ddl_timeout_millis: default_ddl_timeout_millis(), connect_timeout_millis: 5_000u64, tcp_nodelay: true, } diff --git a/src/meta-srv/Cargo.toml b/src/meta-srv/Cargo.toml index ef1289d5b0..0cd4f5c95b 100644 --- a/src/meta-srv/Cargo.toml +++ b/src/meta-srv/Cargo.toml @@ -18,12 +18,14 @@ common-base = { path = "../common/base" } common-catalog = { path = "../common/catalog" } common-error = { path = "../common/error" } common-grpc = { path = "../common/grpc" } +common-grpc-expr = { path = "../common/grpc-expr" } common-meta = { path = "../common/meta" } common-procedure = { path = "../common/procedure" } common-runtime = { path = "../common/runtime" } common-telemetry = { path = "../common/telemetry" } common-time = { path = "../common/time" } dashmap = "5.4" +datatypes = { path = "../datatypes" } derive_builder = "0.12" etcd-client.workspace = true futures.workspace = true @@ -53,6 +55,5 @@ servers = { path = "../servers" } [dev-dependencies] chrono.workspace = true common-procedure-test = { path = "../common/procedure-test" } -datatypes = { path = "../datatypes" } tracing = "0.1" tracing-subscriber = { version = "0.3", features = ["env-filter"] } diff --git a/src/meta-srv/src/ddl.rs b/src/meta-srv/src/ddl.rs index eeb8b89648..a71495bd27 100644 --- a/src/meta-srv/src/ddl.rs +++ b/src/meta-srv/src/ddl.rs @@ -16,12 +16,15 @@ use std::sync::Arc; use client::client_manager::DatanodeClients; use common_meta::key::TableMetadataManagerRef; -use common_meta::rpc::ddl::{CreateTableTask, DropTableTask}; +use common_meta::rpc::ddl::{AlterTableTask, CreateTableTask, DropTableTask}; use common_meta::rpc::router::TableRoute; use common_procedure::{watcher, ProcedureId, ProcedureManagerRef, ProcedureWithId}; use snafu::ResultExt; +use table::metadata::RawTableInfo; +use table::requests::AlterTableRequest; use crate::error::{self, Result}; +use crate::procedure::alter_table::AlterTableProcedure; use crate::procedure::create_table::CreateTableProcedure; use crate::procedure::drop_table::DropTableProcedure; use crate::service::mailbox::MailboxRef; @@ -33,8 +36,8 @@ pub struct DdlManager { procedure_manager: ProcedureManagerRef, kv_store: KvStoreRef, datanode_clients: Arc, - mailbox: MailboxRef, - server_addr: String, + pub(crate) mailbox: MailboxRef, + pub(crate) server_addr: String, table_metadata_manager: TableMetadataManagerRef, } @@ -104,9 +107,45 @@ impl DdlManager { ) .context(error::RegisterProcedureLoaderSnafu { type_name: DropTableProcedure::TYPE_NAME, + })?; + + let context = self.create_context(); + + self.procedure_manager + .register_loader( + AlterTableProcedure::TYPE_NAME, + Box::new(move |json| { + let context = context.clone(); + AlterTableProcedure::from_json(json, context).map(|p| Box::new(p) as _) + }), + ) + .context(error::RegisterProcedureLoaderSnafu { + type_name: AlterTableProcedure::TYPE_NAME, }) } + pub async fn submit_alter_table_task( + &self, + cluster_id: u64, + alter_table_task: AlterTableTask, + alter_table_request: AlterTableRequest, + table_info: RawTableInfo, + ) -> Result { + let context = self.create_context(); + + let procedure = AlterTableProcedure::new( + cluster_id, + alter_table_task, + alter_table_request, + table_info, + context, + ); + + let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure)); + + self.submit_procedure(procedure_with_id).await + } + pub async fn submit_create_table_task( &self, cluster_id: u64, diff --git a/src/meta-srv/src/error.rs b/src/meta-srv/src/error.rs index eeb560b4dc..da9d51ae76 100644 --- a/src/meta-srv/src/error.rs +++ b/src/meta-srv/src/error.rs @@ -30,6 +30,35 @@ pub enum Error { source: JoinError, }, + #[snafu(display("Failed to convert grpc expr, source: {}", source))] + ConvertGrpcExpr { + location: Location, + source: common_grpc_expr::error::Error, + }, + + #[snafu(display( + "Failed to build table meta for table: {}, source: {}", + table_name, + source + ))] + BuildTableMeta { + table_name: String, + source: table::metadata::TableMetaBuilderError, + location: Location, + }, + + #[snafu(display("Table occurs error, source: {}", source))] + Table { + location: Location, + source: table::error::Error, + }, + + #[snafu(display("Failed to convert RawTableInfo into TableInfo: {}", source))] + ConvertRawTableInfo { + location: Location, + source: datatypes::Error, + }, + #[snafu(display("Failed to execute transaction: {}", msg))] Txn { location: Location, msg: String }, @@ -315,7 +344,7 @@ pub enum Error { source: common_procedure::Error, }, - #[snafu(display("Failed to recover procedure, source: {source}"))] + #[snafu(display("Failed to wait procedure done, source: {source}"))] WaitProcedure { location: Location, source: common_procedure::Error, @@ -477,6 +506,7 @@ impl ErrorExt for Error { | Error::StartGrpc { .. } | Error::Combine { .. } | Error::NoEnoughAvailableDatanode { .. } + | Error::ConvertGrpcExpr { .. } | Error::Join { .. } => StatusCode::Internal, Error::EmptyKey { .. } | Error::MissingRequiredParameter { .. } @@ -502,8 +532,11 @@ impl ErrorExt for Error { | Error::UnexpectedInstructionReply { .. } | Error::Unexpected { .. } | Error::Txn { .. } - | Error::TableIdChanged { .. } => StatusCode::Unexpected, + | Error::TableIdChanged { .. } + | Error::ConvertRawTableInfo { .. } + | Error::BuildTableMeta { .. } => StatusCode::Unexpected, Error::TableNotFound { .. } => StatusCode::TableNotFound, + Error::Table { source, .. } => source.status_code(), Error::RequestDatanode { source, .. } => source.status_code(), Error::InvalidCatalogValue { source, .. } => source.status_code(), Error::RecoverProcedure { source, .. } diff --git a/src/meta-srv/src/procedure.rs b/src/meta-srv/src/procedure.rs index f2872cd3bb..643d7ffdde 100644 --- a/src/meta-srv/src/procedure.rs +++ b/src/meta-srv/src/procedure.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +pub mod alter_table; pub mod create_table; pub mod drop_table; pub mod region_failover; diff --git a/src/meta-srv/src/procedure/alter_table.rs b/src/meta-srv/src/procedure/alter_table.rs new file mode 100644 index 0000000000..9aed748941 --- /dev/null +++ b/src/meta-srv/src/procedure/alter_table.rs @@ -0,0 +1,454 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::vec; + +use api::v1::meta::MailboxMessage; +use async_trait::async_trait; +use client::Database; +use common_meta::ident::TableIdent; +use common_meta::instruction::Instruction; +use common_meta::kv_backend::txn::{Compare, CompareOp, Txn, TxnOp}; +use common_meta::rpc::ddl::AlterTableTask; +use common_meta::rpc::router::TableRoute; +use common_meta::table_name::TableName; +use common_procedure::error::{FromJsonSnafu, Result as ProcedureResult, ToJsonSnafu}; +use common_procedure::{ + Context as ProcedureContext, Error as ProcedureError, LockKey, Procedure, Status, +}; +use common_telemetry::debug; +use futures::future::join_all; +use serde::{Deserialize, Serialize}; +use snafu::{ensure, ResultExt}; +use table::engine::TableReference; +use table::metadata::{RawTableInfo, TableInfo}; +use table::requests::{AlterKind, AlterTableRequest}; + +use super::utils::build_table_metadata_key; +use crate::ddl::DdlContext; +use crate::error::{self, Result}; +use crate::procedure::utils::handle_request_datanode_error; +use crate::service::mailbox::BroadcastChannel; +use crate::table_routes::fetch_table; + +// TODO(weny): removes in following PRs. +#[allow(dead_code)] +pub struct AlterTableProcedure { + context: DdlContext, + data: AlterTableData, +} + +// TODO(weny): removes in following PRs. +#[allow(dead_code)] +impl AlterTableProcedure { + pub(crate) const TYPE_NAME: &'static str = "metasrv-procedure::AlterTable"; + + pub(crate) fn new( + cluster_id: u64, + task: AlterTableTask, + alter_table_request: AlterTableRequest, + table_info: RawTableInfo, + context: DdlContext, + ) -> Self { + Self { + context, + data: AlterTableData::new(task, alter_table_request, table_info, cluster_id), + } + } + + pub(crate) fn from_json(json: &str, context: DdlContext) -> ProcedureResult { + let data = serde_json::from_str(json).context(FromJsonSnafu)?; + + Ok(AlterTableProcedure { context, data }) + } + + /// Alters table on datanode. + async fn on_datanode_alter_table(&mut self) -> Result { + let AlterTableState::DatanodeAlterTable(ref table_route) = self.data.state else { + return error::UnexpectedSnafu { + violated: "expected DatanodeAlterTable", + } + .fail(); + }; + + let table_ref = self.data.table_ref(); + + let clients = self.context.datanode_clients.clone(); + let leaders = table_route.find_leaders(); + let mut joins = Vec::with_capacity(leaders.len()); + + for datanode in leaders { + let client = clients.get_client(&datanode).await; + let client = Database::new(table_ref.catalog, table_ref.schema, client); + let expr = self.data.task.alter_table.clone(); + joins.push(common_runtime::spawn_bg(async move { + debug!("Sending {:?} to {:?}", expr, client); + client + .alter(expr) + .await + .map_err(handle_request_datanode_error(datanode)) + })); + } + + let _ = join_all(joins) + .await + .into_iter() + .map(|e| e.context(error::JoinSnafu).flatten()) + .collect::>>()?; + + self.data.state = AlterTableState::Broadcast; + + Ok(Status::executing(true)) + } + + /// Update table metadata for rename table operation. + async fn on_update_metadata_for_rename( + &self, + new_table_name: &str, + new_table_info: TableInfo, + ) -> Result { + let table_ref = self.data.table_ref(); + let new_table_ref = TableReference { + catalog: table_ref.catalog, + schema: table_ref.schema, + table: new_table_name, + }; + let table_id = self.data.table_info.ident.table_id; + + // Check whether the table has already been renamed. + if let Some((mut table_global_value, table_route_value)) = + fetch_table(&self.context.kv_store, table_ref).await? + { + let table_route = table_route_value + .clone() + .try_into() + .context(error::TableRouteConversionSnafu)?; + + let (table_global_key, table_route_key) = build_table_metadata_key(table_ref, table_id); + + let (new_table_global_key, new_table_route_key) = + build_table_metadata_key(new_table_ref, table_id); + + table_global_value.table_info = new_table_info.into(); + + let txn = Txn::new() + .when(vec![ + Compare::with_value( + table_route_key.to_string().into_bytes(), + CompareOp::Equal, + table_route_value.clone().into(), + ), + // Compare::with_value( + // table_global_key.to_string().into_bytes(), + // CompareOp::Equal, + // table_global_value + // .clone() + // .as_bytes() + // .context(error::InvalidCatalogValueSnafu)?, + // ), + ]) + .and_then(vec![ + TxnOp::Delete(table_global_key.to_string().into_bytes()), + TxnOp::Delete(table_route_key.to_string().into_bytes()), + TxnOp::Put( + new_table_global_key.to_string().into_bytes(), + table_global_value + .clone() + .as_bytes() + .context(error::InvalidCatalogValueSnafu)?, + ), + TxnOp::Put( + new_table_route_key.to_string().into_bytes(), + table_route_value.into(), + ), + ]); + + let resp = self.context.kv_store.txn(txn).await?; + + ensure!( + resp.succeeded, + error::TxnSnafu { + msg: "table metadata changed" + } + ); + + return Ok(table_route); + } else if let Some((table, route)) = + fetch_table(&self.context.kv_store, new_table_ref).await? + { + let table_route = route.try_into().context(error::TableRouteConversionSnafu)?; + + ensure!( + table.table_info == new_table_info.into(), + error::UnexpectedSnafu { + violated: "table metadata changed" + } + ); + + return Ok(table_route); + } + + error::TableNotFoundSnafu { + name: table_ref.to_string(), + } + .fail() + } + + fn build_new_table_info(&self) -> Result { + // Builds new_meta + let table_info = TableInfo::try_from(self.data.table_info.clone()) + .context(error::ConvertRawTableInfoSnafu)?; + + let table_ref = self.data.table_ref(); + + let request = &self.data.alter_table_request; + + let new_meta = table_info + .meta + .builder_with_alter_kind(table_ref.table, &request.alter_kind) + .context(error::TableSnafu)? + .build() + .with_context(|_| error::BuildTableMetaSnafu { + table_name: table_ref.table.clone(), + })?; + + let mut new_info = table_info.clone(); + new_info.ident.version = table_info.ident.version + 1; + new_info.meta = new_meta; + + Ok(new_info) + } + + /// Update table metadata. + async fn on_update_metadata(&mut self) -> Result { + let request = &self.data.alter_table_request; + let table_id = self.data.table_info.ident.table_id; + let table_ref = self.data.table_ref(); + let new_info = self.build_new_table_info()?; + debug!( + "starting update table: {} metadata, new table info {:?}", + table_ref.to_string(), + new_info + ); + + if let AlterKind::RenameTable { new_table_name } = &request.alter_kind { + let table_route = self + .on_update_metadata_for_rename(new_table_name, new_info) + .await?; + + self.data.state = AlterTableState::DatanodeAlterTable(table_route); + return Ok(Status::executing(true)); + } + + if let Some((mut table_global_value, table_route_value)) = + fetch_table(&self.context.kv_store, table_ref).await? + { + let table_route = table_route_value + .clone() + .try_into() + .context(error::TableRouteConversionSnafu)?; + let new_raw_info: RawTableInfo = new_info.into(); + + // If the metadata already updated. + if table_global_value.table_info == new_raw_info { + debug!("table: {} metadata already updated", table_ref.to_string()); + + self.data.state = AlterTableState::DatanodeAlterTable(table_route); + return Ok(Status::executing(true)); + } + + let (table_global_key, table_route_key) = build_table_metadata_key(table_ref, table_id); + + let txn = Txn::new().when(vec![ + Compare::with_value( + table_route_key.to_string().into_bytes(), + CompareOp::Equal, + table_route_value.clone().into(), + ), + // TODO(weny): due to unordered map, we cannot compare values directly. + // Compare::with_value( + // table_global_key.to_string().into_bytes(), + // CompareOp::Equal, + // table_global_value + // .clone() + // .as_bytes() + // .context(error::InvalidCatalogValueSnafu)?, + // ), + ]); + + table_global_value.table_info = new_raw_info; + + let txn = txn.and_then(vec![TxnOp::Put( + table_global_key.to_string().into_bytes(), + table_global_value + .clone() + .as_bytes() + .context(error::InvalidCatalogValueSnafu)?, + )]); + + let resp = self.context.kv_store.txn(txn).await?; + + ensure!( + resp.succeeded, + error::TxnSnafu { + msg: "table metadata changed" + } + ); + + debug!("table: {} metadata updated", table_ref.to_string()); + + self.data.state = AlterTableState::DatanodeAlterTable(table_route); + + Ok(Status::executing(true)) + } else { + error::TableNotFoundSnafu { + name: table_ref.to_string(), + } + .fail() + } + } + + /// Broadcasts the invalidating table cache instructions. + async fn on_broadcast(&mut self) -> Result { + let table_name = self.data.table_name(); + + let table_ident = TableIdent { + catalog: table_name.catalog_name, + schema: table_name.schema_name, + table: table_name.table_name, + table_id: self.data.table_info.ident.table_id, + engine: self.data.table_info.meta.engine.to_string(), + }; + let instruction = Instruction::InvalidateTableCache(table_ident); + + let msg = &MailboxMessage::json_message( + "Invalidate table cache by alter table procedure", + &format!("Metasrv@{}", self.context.server_addr), + "Frontend broadcast", + common_time::util::current_time_millis(), + &instruction, + ) + .with_context(|_| error::SerializeToJsonSnafu { + input: instruction.to_string(), + })?; + + self.context + .mailbox + .broadcast(&BroadcastChannel::Frontend, msg) + .await?; + + Ok(Status::Done) + } + + fn lock_key_inner(&self) -> Vec { + let table_ref = self.data.table_ref(); + let table_key = common_catalog::format_full_table_name( + table_ref.catalog, + table_ref.schema, + table_ref.table, + ); + let mut lock_key = vec![table_key]; + + if let AlterKind::RenameTable { new_table_name } = &self.data.alter_table_request.alter_kind + { + lock_key.push(common_catalog::format_full_table_name( + table_ref.catalog, + table_ref.schema, + new_table_name, + )) + } + + lock_key + } +} + +#[async_trait] +impl Procedure for AlterTableProcedure { + fn type_name(&self) -> &str { + Self::TYPE_NAME + } + + async fn execute(&mut self, _ctx: &ProcedureContext) -> ProcedureResult { + let error_handler = |e| { + if matches!(e, error::Error::RetryLater { .. }) { + ProcedureError::retry_later(e) + } else { + ProcedureError::external(e) + } + }; + + match self.data.state { + AlterTableState::UpdateMetadata => self.on_update_metadata().await, + AlterTableState::DatanodeAlterTable(_) => self.on_datanode_alter_table().await, + AlterTableState::Broadcast => self.on_broadcast().await, + } + .map_err(error_handler) + } + + fn dump(&self) -> ProcedureResult { + serde_json::to_string(&self.data).context(ToJsonSnafu) + } + + fn lock_key(&self) -> LockKey { + let key = self.lock_key_inner(); + + LockKey::new(key) + } +} + +#[derive(Debug, Serialize, Deserialize)] +enum AlterTableState { + /// Updates table metadata. + UpdateMetadata, + /// Datanode alters the table. + DatanodeAlterTable(TableRoute), + /// Broadcasts the invalidating table cache instruction. + Broadcast, +} + +#[derive(Debug, Serialize, Deserialize)] +pub struct AlterTableData { + state: AlterTableState, + task: AlterTableTask, + alter_table_request: AlterTableRequest, + table_route: Option, + table_info: RawTableInfo, + cluster_id: u64, +} + +impl AlterTableData { + pub fn new( + task: AlterTableTask, + alter_table_request: AlterTableRequest, + table_info: RawTableInfo, + cluster_id: u64, + ) -> Self { + Self { + state: AlterTableState::UpdateMetadata, + task, + alter_table_request, + table_route: None, + table_info, + cluster_id, + } + } + + fn table_ref(&self) -> TableReference { + self.task.table_ref() + } + + fn table_name(&self) -> TableName { + self.task.table_name() + } +} diff --git a/src/meta-srv/src/service/ddl.rs b/src/meta-srv/src/service/ddl.rs index 1d9cb57612..ceb346f91c 100644 --- a/src/meta-srv/src/service/ddl.rs +++ b/src/meta-srv/src/service/ddl.rs @@ -17,8 +17,10 @@ use api::v1::meta::{ Table, TableRoute, }; use api::v1::TableId; +use catalog::helper::TableGlobalKey; +use common_grpc_expr::alter_expr_to_request; use common_meta::key::TableRouteKey; -use common_meta::rpc::ddl::{CreateTableTask, DdlTask, DropTableTask}; +use common_meta::rpc::ddl::{AlterTableTask, CreateTableTask, DdlTask, DropTableTask}; use common_meta::rpc::router; use common_meta::table_name::TableName; use common_telemetry::{info, warn}; @@ -32,7 +34,7 @@ use crate::ddl::DdlManagerRef; use crate::error::{self, Result}; use crate::metasrv::{MetaSrv, SelectorContext, SelectorRef}; use crate::sequence::SequenceRef; -use crate::table_routes::get_table_route_value; +use crate::table_routes::{get_table_global_value, get_table_route_value}; #[async_trait::async_trait] impl ddl_task_server::DdlTask for MetaSrv { @@ -79,6 +81,15 @@ impl ddl_task_server::DdlTask for MetaSrv { ) .await? } + DdlTask::AlterTable(alter_table_task) => { + handle_alter_table_task( + header.cluster_id, + alter_table_task, + self.kv_store().clone(), + self.ddl_manager().clone(), + ) + .await? + } }; Ok(Response::new(resp)) @@ -229,7 +240,61 @@ async fn handle_drop_table_task( .submit_drop_table_task(cluster_id, drop_table_task, table_route) .await?; - info!("Table: {table_id} created via procedure_id {id:?}"); + Ok(SubmitDdlTaskResponse { + key: id.to_string().into(), + ..Default::default() + }) +} + +async fn handle_alter_table_task( + cluster_id: u64, + mut alter_table_task: AlterTableTask, + kv_store: KvStoreRef, + ddl_manager: DdlManagerRef, +) -> Result { + let table_id = alter_table_task + .alter_table + .table_id + .as_ref() + .context(error::UnexpectedSnafu { + violated: "expected table id ", + })? + .id; + + let mut alter_table_request = + alter_expr_to_request(table_id, alter_table_task.alter_table.clone()) + .context(error::ConvertGrpcExprSnafu)?; + + let table_ref = alter_table_task.table_ref(); + + let table_global_key = TableGlobalKey { + catalog_name: table_ref.catalog.to_string(), + schema_name: table_ref.schema.to_string(), + table_name: table_ref.table.to_string(), + }; + + let table_global_value = get_table_global_value(&kv_store, &table_global_key) + .await? + .with_context(|| error::TableNotFoundSnafu { + name: table_ref.to_string(), + })?; + + let table_info = table_global_value.table_info; + + // Sets alter_table's table_version + alter_table_task.alter_table.table_version = table_info.ident.version; + alter_table_request.table_version = Some(table_info.ident.version); + + let id = ddl_manager + .submit_alter_table_task( + cluster_id, + alter_table_task, + alter_table_request, + table_info, + ) + .await?; + + info!("Table: {table_id} is altering via procedure_id {id:?}"); Ok(SubmitDdlTaskResponse { key: id.to_string().into(), diff --git a/src/meta-srv/src/table_routes.rs b/src/meta-srv/src/table_routes.rs index 8287bdf680..8d40f276d7 100644 --- a/src/meta-srv/src/table_routes.rs +++ b/src/meta-srv/src/table_routes.rs @@ -185,13 +185,13 @@ pub(crate) async fn fetch_tables( // there won't be many keys, in fact, there is usually just one. for tgk in keys { let tgv = get_table_global_value(kv_store, &tgk).await?; - if tgv.is_none() { + let Some(tgv) = tgv else { warn!("Table global value is absent: {}", tgk); continue; - } - let tgv = tgv.unwrap(); + }; let trk = table_route_key(tgv.table_id(), &tgk); + let trv = get_table_route_value(kv_store, &trk).await?; tables.push((tgv, trv)); diff --git a/src/mito/src/engine/procedure/alter.rs b/src/mito/src/engine/procedure/alter.rs index 1c337b97ff..ae9f450b35 100644 --- a/src/mito/src/engine/procedure/alter.rs +++ b/src/mito/src/engine/procedure/alter.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::cmp::Ordering; use std::sync::Arc; use async_trait::async_trait; @@ -20,7 +21,7 @@ use common_procedure::{Context, LockKey, Procedure, ProcedureManager, Result, St use common_telemetry::logging; use common_telemetry::metric::Timer; use serde::{Deserialize, Serialize}; -use snafu::{ensure, OptionExt, ResultExt}; +use snafu::{OptionExt, ResultExt}; use store_api::manifest::Manifest; use store_api::storage::{AlterOperation, StorageEngine}; use table::engine::TableReference; @@ -29,7 +30,7 @@ use table::requests::{AlterKind, AlterTableRequest}; use table::{Table, TableRef}; use crate::engine::MitoEngineInner; -use crate::error::{TableNotFoundSnafu, UpdateTableManifestSnafu, VersionChangedSnafu}; +use crate::error::{StaleVersionSnafu, TableNotFoundSnafu, UpdateTableManifestSnafu}; use crate::manifest::action::{TableChange, TableMetaAction, TableMetaActionList}; use crate::metrics; use crate::table::MitoTable; @@ -110,7 +111,7 @@ impl AlterMitoTable { table_name: table_ref.to_string(), })?; let info = table.table_info(); - data.table_version = info.ident.version; + data.table_version = data.request.table_version.unwrap_or(info.ident.version); Ok(AlterMitoTable { data, @@ -161,19 +162,23 @@ impl AlterMitoTable { /// Prepare table info. fn on_prepare(&mut self) -> Result { let current_info = self.table.table_info(); - ensure!( - current_info.ident.version == self.data.table_version, - VersionChangedSnafu { + + match current_info.ident.version.cmp(&self.data.table_version) { + Ordering::Greater => Ok(Status::Done), + Ordering::Less => Err(StaleVersionSnafu { expect: self.data.table_version, - actual: current_info.ident.version, + current: current_info.ident.version, } - ); + .build() + .into()), + Ordering::Equal => { + // We don't check the table name in the table engine as it is the catalog + // manager's duty to ensure the table name is unused. + self.data.state = AlterTableState::EngineAlterTable; - // We don't check the table name in the table engine as it is the catalog - // manager's duty to ensure the table name is unused. - self.data.state = AlterTableState::EngineAlterTable; - - Ok(Status::executing(true)) + Ok(Status::executing(true)) + } + } } /// Engine alters the table. diff --git a/src/mito/src/engine/tests.rs b/src/mito/src/engine/tests.rs index 96f649bdba..c924a97cc9 100644 --- a/src/mito/src/engine/tests.rs +++ b/src/mito/src/engine/tests.rs @@ -548,6 +548,7 @@ fn new_add_columns_req( }, ], }, + table_version: None, } } @@ -577,6 +578,7 @@ pub(crate) fn new_add_columns_req_with_location( }, ], }, + table_version: None, } } @@ -672,6 +674,7 @@ async fn test_alter_table_remove_column() { alter_kind: AlterKind::DropColumns { names: vec![String::from("memory"), String::from("my_field")], }, + table_version: None, }; let table = table_engine .alter_table(&EngineContext::default(), req) @@ -719,6 +722,7 @@ async fn test_alter_rename_table() { alter_kind: AlterKind::RenameTable { new_table_name: new_table_name.to_string(), }, + table_version: None, }; let table = table_engine.alter_table(&ctx, req).await.unwrap(); diff --git a/src/mito/src/error.rs b/src/mito/src/error.rs index 8641e60359..81693701dd 100644 --- a/src/mito/src/error.rs +++ b/src/mito/src/error.rs @@ -164,10 +164,11 @@ pub enum Error { #[snafu(display("Invalid schema, source: {}", source))] InvalidRawSchema { source: datatypes::error::Error }, - #[snafu(display("Table version changed, expect: {}, actual: {}", expect, actual))] - VersionChanged { + #[snafu(display("Stale version found, expect: {}, current: {}", expect, current))] + StaleVersion { expect: TableVersion, - actual: TableVersion, + current: TableVersion, + location: Location, }, } @@ -189,7 +190,7 @@ impl ErrorExt for Error { | MissingTimestampIndex { .. } | TableNotFound { .. } | InvalidRawSchema { .. } - | VersionChanged { .. } => StatusCode::InvalidArguments, + | StaleVersion { .. } => StatusCode::InvalidArguments, TableExists { .. } => StatusCode::TableAlreadyExists, diff --git a/src/mito/src/table/test_util.rs b/src/mito/src/table/test_util.rs index c937958e7d..e439973e13 100644 --- a/src/mito/src/table/test_util.rs +++ b/src/mito/src/table/test_util.rs @@ -129,6 +129,7 @@ pub fn new_alter_request(alter_kind: AlterKind) -> AlterTableRequest { table_name: TABLE_NAME.to_string(), table_id: TABLE_ID, alter_kind, + table_version: None, } } diff --git a/src/table-procedure/src/alter.rs b/src/table-procedure/src/alter.rs index 33f4050350..9d30c54fc0 100644 --- a/src/table-procedure/src/alter.rs +++ b/src/table-procedure/src/alter.rs @@ -317,6 +317,7 @@ mod tests { alter_kind: AlterKind::RenameTable { new_table_name: new_table_name.to_string(), }, + table_version: None, }; let TestEnv { diff --git a/src/table/src/engine.rs b/src/table/src/engine.rs index b0e8ef2a3e..98f1c3a533 100644 --- a/src/table/src/engine.rs +++ b/src/table/src/engine.rs @@ -28,7 +28,7 @@ use crate::TableRef; pub mod manager; /// Represents a resolved path to a table of the form “catalog.schema.table” -#[derive(Debug, PartialEq, Eq, Hash)] +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] pub struct TableReference<'a> { pub catalog: &'a str, pub schema: &'a str, diff --git a/src/table/src/requests.rs b/src/table/src/requests.rs index c5c99bdba9..6e4bcd0e77 100644 --- a/src/table/src/requests.rs +++ b/src/table/src/requests.rs @@ -29,7 +29,7 @@ use store_api::storage::RegionNumber; use crate::engine::TableReference; use crate::error; use crate::error::ParseTableOptionSnafu; -use crate::metadata::TableId; +use crate::metadata::{TableId, TableVersion}; pub const IMMUTABLE_TABLE_META_KEY: &str = "__private.immutable_table_meta"; pub const IMMUTABLE_TABLE_LOCATION_KEY: &str = "location"; @@ -164,6 +164,8 @@ pub struct AlterTableRequest { pub table_name: String, pub table_id: TableId, pub alter_kind: AlterKind, + // None in standalone. + pub table_version: Option, } impl AlterTableRequest {